我正在使用芹菜在Hadoop上运行长时间运行的任务。每个任务在Hadoop上执行一个Pig脚本,该脚本运行约30分钟-2小时。
我目前的Hadoop设置有4个队列a,b,c和默认值。所有任务当前都由单个工作线程执行,该工作线程将作业提交到单个队列。
我想再添加 3 个将作业提交到其他队列的工作线程,每个队列一个工作线程。
问题是队列目前是硬编码的,我希望为每个工作人员制作这个变量。
我搜索了很多,但我找不到一种方法来为每个芹菜工人传递不同的队列值并在我的任务中访问它。
我就这样开始我的芹菜工人。
celery -A app.celery worker
我希望在命令行本身中传递一些额外的参数并在我的任务中访问它,但芹菜抱怨它不理解我的自定义参数。
我计划通过设置--concurrency=3
参数在同一主机上运行所有工作线程。这个问题有什么解决方案吗?
谢谢!
编辑
目前的情况是这样的。每次我尝试执行任务print_something说tasks.print_something.delay()
它只打印队列 C。
@celery.task()
def print_something():
print "C"
我需要让工作人员根据我在启动它们时传递给他们的值打印一个可变字母。
@celery.task()
def print_something():
print "<Variable Value Per Worker Here>"
希望这对某人有所帮助。
这个问题需要解决多个问题。
第一步涉及在芹菜中添加对自定义参数的支持。如果不这样做,芹菜会抱怨它不理解参数。
由于我正在使用 Flask 运行芹菜,因此我像这样初始化芹菜。
def configure_celery():
app.config.update(
CELERY_BROKER_URL='amqp://:@localhost:5672',
RESULT_BACKEND='db+mysql://root:@localhost:3306/<database_name>'
)
celery = Celery(app.import_name, backend=app.config['RESULT_BACKEND'],
broker=app.config['CELERY_BROKER_URL'])
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
我调用此函数来初始化芹菜并将其存储在名为芹菜的变量中。
celery = configure_celery()
要添加自定义参数,您需要执行以下操作。
def add_hadoop_queue_argument_to_worker(parser):
parser.add_argument(
'--hadoop-queue', help='Hadoop queue to be used by the worker'
)
下面使用的芹菜是我们从上述步骤中获得的芹菜。
celery.user_options['worker'].add(add_hadoop_queue_argument_to_worker)
下一步是使此参数在辅助角色中可访问。为此,请按照以下步骤操作。
class HadoopCustomWorkerStep(bootsteps.StartStopStep):
def __init__(self, worker, **kwargs):
worker.app.hadoop_queue = kwargs['hadoop_queue']
通知芹菜使用此类创建辅助角色。
celery.steps['worker'].add(HadoopCustomWorkerStep)
任务现在应该能够访问变量。
@app.task(bind=True)
def print_hadoop_queue_from_config(self):
print self.app.hadoop_queue
通过在命令行上运行辅助角色来验证它。
celery -A app.celery worker --concurrency=1 --hadoop-queue=A -n aworker@%h
celery -A app.celery worker --concurrency=1 --hadoop-queue=B -n bworker@%h
celery -A app.celery worker --concurrency=1 --hadoop-queue=C -n cworker@%h
celery -A app.celery worker --concurrency=1 --hadoop-queue=default -n defaultworker@%h
我通常做的是,在另一个脚本(比如 manage.py(中启动工作线程(任务未执行(后,我添加带有参数的命令来启动特定任务或具有不同参数的任务。
在 manager.py:
from tasks import some_task
@click.command
def run_task(params):
some_task.apply_async(params)
这将根据需要启动任务。