我正在尝试使用celery
和本文档运行一个简单的工作流。我正在使用链按顺序运行任务,工作流如下
提取一个文件,标记它,并将一个文档的句子标记的JSON转储加载到另一个(新(文件中。在文件夹中的文件列表上迭代工作流
以下是我的代码:-
文件夹结构
celery-pipeline/
├── celeryapp.py
├── celeryconfig.py
├── data/
├── output/
└── tasks.py
celeryapp.py
from celery import Celery
app = Celery()
app.config_from_object('celeryconfig')
celeryconfig.py
imports = ('tasks',)
broker_url = 'redis://localhost:6379/0'
result_backend = 'db+postgresql://celery_user:celery_user@127.0.0.1:5432/celery_db'
task_ignore_result = False
task_track_started = True
task_default_queue = 'default'
task_default_rate_limit = '20/s'
task_time_limit = 7200
worker_pool_restarts = True
任务.py
import os
import json
import spacy
import logging
from datetime import datetime, timedelta
from celeryapp import app
sp = spacy.load('en_core_web_sm')
@app.task(bind=True)
def extract(self, filename):
file_path = os.path.join(os.getcwd(), 'data', filename)
doc = open(file_path).read()
print('Extract called')
return doc
@app.task(bind=True)
def transform_tokenize_doc(self, doc:str):
sentences = []
for sent in sp(doc).sents:
sentences.append(str(sent).strip())
return sentences
@app.task(bind=True)
def load(self, filename, *args):
with open(os.path.join(os.getcwd(), 'output', filename), 'a+') as file:
file.write(json.dumps(args, indent=4))
if __name__ == '__main__':
tasks = []
for filename in os.listdir(os.path.join(os.getcwd(), 'data'))[:10]:
print(f'filename is {filename}')
etl = (extract.s(filename) | transform_tokenize_doc.s() | load.s(filename)).apply_async()
tasks.append(etl)
for task in tasks:
task.get()
在根文件夹-celery-pipeline/
中运行celery -A tasks worker --loglevel=info
时,我收到以下错误:-
Traceback (most recent call last):
File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/kombu/utils/objects.py", line 41, in __get__
return obj.__dict__[self.__name__]
KeyError: 'control'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/home/ubuntu/Documents/projects/celery-venv/bin/celery", line 8, in <module>
sys.exit(main())
File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/celery/__main__.py", line 15, in main
sys.exit(_main())
File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/celery/bin/celery.py", line 213, in main
return celery(auto_envvar_prefix="CELERY")
File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/click/core.py", line 829, in __call__
return self.main(*args, **kwargs)
File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/click/core.py", line 782, in main
rv = self.invoke(ctx)
File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/click/core.py", line 1066, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/click/core.py", line 610, in invoke
return callback(*args, **kwargs)
File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/click/decorators.py", line 21, in new_func
return f(get_current_context(), *args, **kwargs)
File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/celery/bin/base.py", line 132, in caller
return f(ctx, *args, **kwargs)
File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/celery/bin/worker.py", line 326, in worker
**kwargs)
File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/celery/worker/worker.py", line 99, in __init__
self.setup_instance(**self.prepare_args(**kwargs))
File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/celery/worker/worker.py", line 139, in setup_instance
self.blueprint.apply(self, **kwargs)
File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/celery/bootsteps.py", line 211, in apply
step.include(parent)
File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/celery/bootsteps.py", line 379, in include
inc, ret = self._should_include(parent)
File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/celery/bootsteps.py", line 335, in _should_include
return True, self.create(parent)
File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/celery/worker/components.py", line 238, in create
prefetch_multiplier=w.prefetch_multiplier,
File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/celery/bootsteps.py", line 331, in instantiate
return instantiate(name, *args, **kwargs)
File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/celery/utils/imports.py", line 44, in instantiate
return symbol_by_name(name)(*args, **kwargs)
File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 212, in __init__
self.blueprint.apply(self, **dict(worker_options or {}, **kwargs))
File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/celery/bootsteps.py", line 205, in apply
step = S(parent, **kwargs)
File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/celery/worker/consumer/control.py", line 25, in __init__
self.box = (pidbox.gPidbox if self.is_green else pidbox.Pidbox)(c)
File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/celery/worker/pidbox.py", line 28, in __init__
self.node = c.app.control.mailbox.Node(
File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/kombu/utils/objects.py", line 43, in __get__
value = obj.__dict__[self.__name__] = self.__get(obj)
File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/celery/app/base.py", line 1230, in control
return instantiate(self.control_cls, app=self)
File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/celery/utils/imports.py", line 44, in instantiate
return symbol_by_name(name)(*args, **kwargs)
File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/kombu/utils/imports.py", line 56, in symbol_by_name
module = imp(module_name, package=package, **kwargs)
File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/importlib/__init__.py", line 126, in import_module
return _bootstrap._gcd_import(name[level:], package, level)
File "<frozen importlib._bootstrap>", line 994, in _gcd_import
File "<frozen importlib._bootstrap>", line 971, in _find_and_load
File "<frozen importlib._bootstrap>", line 955, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 665, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 678, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/celery/app/control.py", line 9, in <module>
from kombu.matcher import match
File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/kombu/matcher.py", line 132, in <module>
for ep, args in entrypoints('kombu.matchers'):
File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/kombu/utils/compat.py", line 93, in entrypoints
for ep in importlib_metadata.entry_points().get(namespace, [])
File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/importlib_metadata/__init__.py", line 865, in entry_points
return SelectableGroups.load(eps).select(**params)
File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/importlib_metadata/__init__.py", line 340, in load
ordered = sorted(eps, key=by_group)
File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/importlib_metadata/__init__.py", line 863, in <genexpr>
dist.entry_points for dist in unique(distributions())
File "/home/ubuntu/Documents/projects/celery-venv/lib/python3.6/site-packages/importlib_metadata/_itertools.py", line 16, in unique_everseen
k = key(element)
AttributeError: 'PathDistribution' object has no attribute 'name'
我试图在stackoverflow上搜索这个错误,但找不到关于这个错误的太多见解。如果能给一些提示,我将不胜感激
问题可能与importlib-metadata
有关。尝试在venv中添加一个要求,将其限制为早期版本。在类似的情况下,importlib-metadata<3.4.0
对我有效
spacy
的下一个版本(v3.0.6(应该通过删除importlib-metadata
作为一个需求来解决这个问题(至少如果它只与spacy
相关(。
我在spacy v3.0.6中也遇到了同样的错误。importlib元数据==3.4.0对我有用。importlib元数据==4.0.0 出现错误
spacyv3.4.1
也出现了同样的问题。importlib-metadata==3.4.0
在创建虚拟环境时也为我解决了这个问题。有趣的是,我的importlib-metadata
预先默认为2.0.0,尽管我的python版本是3.9.10
。