使用芹菜节拍在多个时间(使用不同参数)调度任务,但任务只运行一次(使用随机参数)



我想要实现的编写一个调度程序,使用数据库在不同的时间调度类似的任务。

同样,我使用芹菜节拍,下面的代码片段会给一个想法

try:
    reader = MongoReader()
except:
    raise
try:
    tasks = reader.get_scheduled_tasks()
except:
    raise
celerybeat_schedule = dict()
for task in tasks:
    celerybeat_schedule[task["task_id"]] =dict()
    celerybeat_schedule[task["task_id"]]["task"] = task["task_name"]
    celerybeat_schedule[task["task_id"]]["args"] = (task,)
    celerybeat_schedule[task["task_id"]]["schedule"] = get_task_schedule(task)
app.conf.update(BROKER_URL=rabbit_mq_endpoint, CELERY_TASK_SERIALIZER='json', CELERY_ACCEPT_CONTENT=['json'], CELERYBEAT_SCHEDULE=celerybeat_schedule)

这是三个步骤—从数据存储读取所有任务-创建一个字典,芹菜调度程序,由所有具有属性的任务填充,task_name(将运行的方法),参数(传递给方法的数据),schedule(存储何时运行)-更新芹菜配置

预期的场景假设所有条目运行相同的芹菜任务名称,只是打印,有相同的调度每5分钟运行一次,有不同的参数指定打印的内容,让我们说db有

task name     , parameter , schedule
regular_print , Hi        , {"minutes" : 5}
regular_print , Hello        , {"minutes" : 5}
regular_print , Bye        , {"minutes" : 5}

我希望这些每5分钟打印一次以打印所有三个

会发生什么只有一个Hi, Hello, Bye打印(可能是随机的,肯定不是顺序的)

请帮忙,提前感谢:)

可以使用版本4的芹菜解决这个问题。样品类似于我的工作…也可以在文档中找到芹菜版本4

    #taking address and user-pass from environment(you can mention direct values) 
    ex_host_queue = os.environ["EX_HOST_QUEUE"]
    ex_port_queue = os.environ["EX_PORT_QUEUE"]
    ex_user_queue = os.environ["EX_USERID_QUEUE"]
    ex_pass_queue = os.environ["EX_PASSWORD_QUEUE"]
    broker= "amqp://"+ex_user_queue+":"+ex_pass_queue+"@"+ex_host_queue+":"+ex_port_queue+"//"
    #celery initialization
    app = Celery(__name__,backend=broker, broker=broker)
    app.conf.task_default_queue = 'scheduler_queue'
    app.conf.update(
        task_serializer='json',
        accept_content=['json'],  # Ignore other content
        result_serializer='json'
    )
task = {"task_id":1,"a":10,"b":20}
##method to update scheduler
def add_scheduled_task(task):
    print("scheduling task")
    del task["_id"]
    print("adding task_id")
    name = task["task_name"]
    app.add_periodic_task(timedelta(minutes=1),add.s(task), name = task["task_id"])    
@app.task(name='scheduler_task')
def scheduler_task(data):
    print(str(data["a"]+data["b"]))

最新更新