运行DAG时,气流调度程序崩溃



当我从仪表板手动触发气流调度程序时,它会崩溃。

executor=DaskExecutor

气流版本:=1.10.7

sql_alchemy_conn=postgresql://airflow:airflow@localhost:5432/气流

python版本=3.6

崩溃日志为:

[2020-08-20 07:01:49,288] {scheduler_job.py:1148} INFO - Sending ('hello_world', 'dummy_task', datetime.datetime(2020, 8, 20, 1, 31, 47, 20630, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 2 and queue default
[2020-08-20 07:01:49,288] {base_executor.py:58} INFO - Adding to queue: ['airflow', 'run', 'hello_world', 'dummy_task', '2020-08-20T01:31:47.020630+00:00', '--local', '--pool', 'default_pool', '-sd', '/workflows/dags/helloWorld.py']
/mypython/lib/python3.6/site-packages/airflow/executors/dask_executor.py:63: UserWarning: DaskExecutor does not support queues. All tasks will be run in the same cluster
'DaskExecutor does not support queues. '
distributed.protocol.pickle - INFO - Failed to serialize <function DaskExecutor.execute_async.<locals>.airflow_run at 0x12057a9d8>. Exception: Cell is empty
[2020-08-20 07:01:49,292] {scheduler_job.py:1361} ERROR - Exception when executing execute_helper
Traceback (most recent call last):
File "/mypython/lib/python3.6/site-packages/distributed/worker.py", line 843, in dumps_function
result = cache[func]
KeyError: <function DaskExecutor.execute_async.<locals>.airflow_run at 0x12057a9d8>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/mypython/lib/python3.6/site-packages/distributed/protocol/pickle.py", line 38, in dumps
result = pickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
AttributeError: Can't pickle local object 'DaskExecutor.execute_async.<locals>.airflow_run'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/mypython/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1359, in _execute
self._execute_helper()
File "/mypython/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1420, in _execute_helper
if not self._validate_and_run_task_instances(simple_dag_bag=simple_dag_bag):
File "/mypython/lib/python3.6/site-packages/airflow/jobs/scheduler_job.py", line 1482, in _validate_and_run_task_instances
self.executor.heartbeat()
File "/mypython/lib/python3.6/site-packages/airflow/executors/base_executor.py", line 130, in heartbeat
self.trigger_tasks(open_slots)
File "/mypython/lib/python3.6/site-packages/airflow/executors/base_executor.py", line 154, in trigger_tasks
executor_config=simple_ti.executor_config)
File "/mypython/lib/python3.6/site-packages/airflow/executors/dask_executor.py", line 70, in execute_async
future = self.client.submit(airflow_run, pure=False)
File "/mypython/lib/python3.6/site-packages/distributed/client.py", line 1279, in submit
actors=actor)
File "/mypython/lib/python3.6/site-packages/distributed/client.py", line 2249, in _graph_to_futures
'tasks': valmap(dumps_task, dsk3),
File "/mypython/lib/python3.6/site-packages/toolz/dicttoolz.py", line 83, in valmap
rv.update(zip(iterkeys(d), map(func, itervalues(d))))
File "/mypython/lib/python3.6/site-packages/distributed/worker.py", line 881, in dumps_task
return {'function': dumps_function(task[0]),
File "/mypython/lib/python3.6/site-packages/distributed/worker.py", line 845, in dumps_function
result = pickle.dumps(func)
File "/mypython/lib/python3.6/site-packages/distributed/protocol/pickle.py", line 51, in dumps
return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
File "/mypython/lib/python3.6/site-packages/cloudpickle/cloudpickle_fast.py", line 101, in dumps
cp.dump(obj)
File "/mypython/lib/python3.6/site-packages/cloudpickle/cloudpickle_fast.py", line 540, in dump
return Pickler.dump(self, obj)
File "/usr/local/opt/python@3.6/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py", line 409, in dump
self.save(obj)
File "/usr/local/opt/python@3.6/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/mypython/lib/python3.6/site-packages/cloudpickle/cloudpickle_fast.py", line 722, in save_function
*self._dynamic_function_reduce(obj), obj=obj
File "/mypython/lib/python3.6/site-packages/cloudpickle/cloudpickle_fast.py", line 659, in _save_reduce_pickle5
dictitems=dictitems, obj=obj
File "/usr/local/opt/python@3.6/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py", line 610, in save_reduce
save(args)
File "/usr/local/opt/python@3.6/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/opt/python@3.6/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py", line 751, in save_tuple
save(element)
File "/usr/local/opt/python@3.6/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/opt/python@3.6/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py", line 736, in save_tuple
save(element)
File "/usr/local/opt/python@3.6/Frameworks/Python.framework/Versions/3.6/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/mypython/lib/python3.6/site-packages/dill/_dill.py", line 1146, in save_cell
f = obj.cell_contents
ValueError: Cell is empty
[2020-08-20 07:01:49,302] {helpers.py:322} INFO - Sending Signals.SIGTERM to GPID 11451
[2020-08-20 07:01:49,303] {dag_processing.py:804} INFO - Exiting gracefully upon receiving signal 15
[2020-08-20 07:01:49,310] {dag_processing.py:1379} INFO - Waiting up to 5 seconds for processes to exit...
[2020-08-20 07:01:49,318] {helpers.py:288} INFO - Process psutil.Process(pid=11451, status='terminated') (11451) terminated with exit code 0
[2020-08-20 07:01:49,319] {helpers.py:288} INFO - Process psutil.Process(pid=11600, status='terminated') (11600) terminated with exit code None
[2020-08-20 07:01:49,319] {scheduler_job.py:1364} INFO - Exited execute loop

我正在macOS Catalina上运行它,如果这可能有助于隔离错误的话。

我相信这个问题可能就是您正在经历的问题。

看看那张票,它似乎仍然是开放的,因为已经进行了修复,但还没有正式发布。

这个pull请求包含对上面链接的问题的修复-您可以尝试从那里本地构建Airflow堆栈,看看它是否为您解决了问题。

这是在下游Dask依赖项的新版本中开始发生的。锁定版本可修复此问题。

pip uninstall cloudpickle distributed

pip install cloudpickle==1.4.1 distributed==2.17.0

这些是有问题的版本:

cloudpickle==1.6.0
distributed==2.26.0

我在docker中运行Airflow1.10.10,并对Dask2.13.0使用相同的图像。

最新更新