在气流中使用芹菜

  • 本文关键字:芹菜 celery airflow
  • 更新时间 :
  • 英文 :


我是气流的新手,现在我发现气流正在使用芹菜来安排它的任务。要运行气流,我需要运行命令"气流工作者"来启动芹菜。但是,这里总是有一个错误。由于我在互联网上搜索过,大多数问题都发生在用户自己编写 celery.py 上。我只通过启动气流使用芹菜。所以它有点不同。 有人可以帮助我吗?下面是该错误的屏幕截图。

airflow@linux-test:~$ airflow worker
[2018-06-22 07:29:04,068] {__init__.py:57} INFO - Using executor CeleryExecutor
[2018-06-22 07:29:04,125] {driver.py:124} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
[2018-06-22 07:29:04,146] {driver.py:124} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
-------------- celery@linux-test v4.2.0 (windowlicker)
---- **** -----
--- * ***  * -- Linux-4.15.0-22-generic-x86_64-with-Ubuntu-18.04-bionic 2018-06-22 07:29:04
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         airflow.executors.celery_executor:0x7f2267122310
- ** ---------- .> transport:   amqp://airflow:**@localhost:5672/airflow
- ** ---------- .> results:     postgresql://airflow:**@localhost:5432/airflow
- *** --- * --- .> concurrency: 16 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> default          exchange=default(direct) key=default
[2018-06-22 07:29:04,630] {__init__.py:57} INFO - Using executor CeleryExecutor
[2018-06-22 07:29:04,689] {driver.py:124} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/Grammar.txt
[2018-06-22 07:29:04,715] {driver.py:124} INFO - Generating grammar tables from /usr/lib/python2.7/lib2to3/PatternGrammar.txt
Starting flask
[2018-06-22 07:29:04,858] {_internal.py:88} INFO -  * Running on http://0.0.0.0:8793/ (Press CTRL+C to quit)
[2018-06-22 07:29:06,122: ERROR/ForkPoolWorker-1] Pool process <celery.concurrency.asynpool.Worker object at 0x7f22648c8e10> error: TypeError("Required argument 'object' (pos 1) not found",)
Traceback (most recent call last):
File "/home/airflow/.local/lib/python2.7/site-packages/billiard/pool.py", line 289, in __call__
sys.exit(self.workloop(pid=pid))
File "/home/airflow/.local/lib/python2.7/site-packages/billiard/pool.py", line 347, in workloop
req = wait_for_job()
File "/home/airflow/.local/lib/python2.7/site-packages/billiard/pool.py", line 447, in receive
ready, req = _receive(1.0)
File "/home/airflow/.local/lib/python2.7/site-packages/billiard/pool.py", line 419, in _recv
return True, loads(get_payload())
File "/home/airflow/.local/lib/python2.7/site-packages/billiard/common.py", line 107, in pickle_loads
return load(BytesIO(s))
TypeError: Required argument 'object' (pos 1) not found
[2018-06-22 07:29:06,127: ERROR/MainProcess] Process 'ForkPoolWorker-1' pid:18839 exited with 'exitcode 1'

卸载 librabbitmq 对我有用:pip uninstall librabbitmq.我不太明白为什么,但显然,该库上的一些优化使事情失败。这是我在某些网站上找到的答案(我不得不翻译页面,因此我无法很好地理解解决方案( 希望对你有帮助

相关内容

  • 没有找到相关文章

最新更新