当并行度设置为大数值时,气流调度程序启动时出现异常



我是Airflow的新手,我正试图使用气流来构建数据管道,但它总是遇到一些异常。我的气流.cfg看起来像这样:

executor = LocalExecutor
sql_alchemy_conn = postgresql+psycopg2://airflow:airflow@localhost/airflow
sql_alchemy_pool_size = 5
parallelism = 96
dag_concurrency = 96
worker_concurrency = 96
max_threads = 96
broker_url = postgresql+psycopg2://airflow:airflow@localhost/airflow
result_backend = postgresql+psycopg2://airflow:airflow@localhost/airflow

当我在一个终端中启动airflow webserver -p 8080,然后在另一个终端启动airflow scheduler时,调度程序运行将具有以下执行选项(当我将并行度设置得更大时,它失败了,否则它工作正常,这可能是计算机特定的,但至少我们知道这是由并行度引起的(。我已经尝试在我的计算机上运行1000个python进程,它运行得很好,我已经将Postgres配置为最多允许500个数据库连接,但它仍然会给我带来错误。

[2019-11-20 12:15:00,820] {dag_processing.py:556} INFO - Launched DagFileProcessorManager with pid: 85050
Process QueuedLocalWorker-18:
Traceback (most recent call last):
File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/managers.py", line 811, in _callmethod
conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
self.run()
File "/Users/edward/.local/share/virtualenvs/avat-utils-JpGzQGRW/lib/python3.7/site-packages/airflow/executors/local_executor.py", line 111, in run
key, command = self.task_queue.get()
File "<string>", line 2, in get
File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/managers.py", line 815, in _callmethod
self._connect()
File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/managers.py", line 802, in _connect
conn = self._Client(self._token.address, authkey=self._authkey)
File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 492, in Client
c = SocketClient(address)
File "/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/multiprocessing/connection.py", line 619, in SocketClient
s.connect(address)
ConnectionRefusedError: [Errno 61] Connection refused

感谢

更新:我尝试在Pycharm中运行,它在Pychart中运行良好,但有时在终端失败,有时不是

我遇到了同样的问题。事实证明,我在airflow.cfg中结合LocalExecutor设置了max_threads=10。切换max_threads=2解决了这个问题。

几天前发现,Airflow在启动时实际上启动了所有并行进程,我认为max_sth和parallelism是容量,但这是启动时将运行的进程数。所以看起来这个问题是由计算机资源不足引起的。

最新更新