我正在重构我的代码使用芹菜工人。
在我使用argparse通过命令行args之前。
例如。
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Node')
parser.add_argument('--environment', action="store", default='local', help="env e.g. production of development")
environment = arg_options.environment
但是现在我遇到了这个错误。
celery -A tasks worker --loglevel=info --environment local
celery: error: no such option: --environment
我该如何添加?
我不想使用环境变量。
e.g export environment=development
芹菜工人不执行您的__main__
。
如果要添加其他命令行选项,则可以使用app.user_options
,但是请注意,它使用optparse
模块,而不是argparse
。
有关更多信息,请参见文档中的此部分:
http://docs.celeryproject.org/en/latest/userguide/extgend.html#preload-options
通过查看源代码,我已经找到了一种处理此操作的方法。
celery.py add:
from celery import bootsteps
from celery.bin import Option
....
app.user_options['worker'].add(
Option('--server', dest='api_server', default='127.0.0.1',
help='API server to use.')
)
app.conf['API_SERVER'] = '127.0.0.1'
class ConfigBootstep(bootsteps.Step):
def __init__(self, worker, api_server=None, **options):
app.conf['API_SERVER'] = api_server
app.steps['worker'].add(ConfigBootstep)
然后在文件中持有您的任务:
from celery import current_app
...
@shared_task()
def roboscope():
API_SERVER = current_app.conf.get('API_SERVER', '127.0.0.1')
我尝试通过在导入模块时解决API_Server作为模块全局变量,但它不起作用,因为它还为时过早。由于我的任务非常密集,因此运行很多次都不会害处。
可以使用工人选项。
例如,我必须在其中一个任务中初始化mongo数据库
from celery import bootsteps
from celery.bin import Option
import mongoConfig
.....
app = Celery('scraper')
app.user_options['worker'].add(
Option('--mongo', dest='is_mongo_required', default=None, help='Mongo Required')
)
class CustomArgs(bootsteps.Step):
def __init__(self, worker, is_mongo_required=None, **options):
# store the api authentication
if is_mongo_required is not None and is_mongo_required[0] == "true":
print("Mongo initialization required")
mongoConfig.init(required=True)
app.steps['worker'].add(CustomArgs)
app.config_from_object('celeryConfig')
命令执行
celery worker -A scraper --concurrency=4 --loglevel=info -Q abc,bcd --mongo=true