我想要实现的编写一个调度程序,使用数据库在不同的时间调度类似的任务。
同样,我使用芹菜节拍,下面的代码片段会给一个想法
try:
reader = MongoReader()
except:
raise
try:
tasks = reader.get_scheduled_tasks()
except:
raise
celerybeat_schedule = dict()
for task in tasks:
celerybeat_schedule[task["task_id"]] =dict()
celerybeat_schedule[task["task_id"]]["task"] = task["task_name"]
celerybeat_schedule[task["task_id"]]["args"] = (task,)
celerybeat_schedule[task["task_id"]]["schedule"] = get_task_schedule(task)
app.conf.update(BROKER_URL=rabbit_mq_endpoint, CELERY_TASK_SERIALIZER='json', CELERY_ACCEPT_CONTENT=['json'], CELERYBEAT_SCHEDULE=celerybeat_schedule)
这是三个步骤—从数据存储读取所有任务-创建一个字典,芹菜调度程序,由所有具有属性的任务填充,task_name(将运行的方法),参数(传递给方法的数据),schedule(存储何时运行)-更新芹菜配置
预期的场景假设所有条目运行相同的芹菜任务名称,只是打印,有相同的调度每5分钟运行一次,有不同的参数指定打印的内容,让我们说db有
task name , parameter , schedule
regular_print , Hi , {"minutes" : 5}
regular_print , Hello , {"minutes" : 5}
regular_print , Bye , {"minutes" : 5}
我希望这些每5分钟打印一次以打印所有三个
会发生什么只有一个Hi, Hello, Bye打印(可能是随机的,肯定不是顺序的)
请帮忙,提前感谢:)
可以使用版本4的芹菜解决这个问题。样品类似于我的工作…也可以在文档中找到芹菜版本4
#taking address and user-pass from environment(you can mention direct values)
ex_host_queue = os.environ["EX_HOST_QUEUE"]
ex_port_queue = os.environ["EX_PORT_QUEUE"]
ex_user_queue = os.environ["EX_USERID_QUEUE"]
ex_pass_queue = os.environ["EX_PASSWORD_QUEUE"]
broker= "amqp://"+ex_user_queue+":"+ex_pass_queue+"@"+ex_host_queue+":"+ex_port_queue+"//"
#celery initialization
app = Celery(__name__,backend=broker, broker=broker)
app.conf.task_default_queue = 'scheduler_queue'
app.conf.update(
task_serializer='json',
accept_content=['json'], # Ignore other content
result_serializer='json'
)
task = {"task_id":1,"a":10,"b":20}
##method to update scheduler
def add_scheduled_task(task):
print("scheduling task")
del task["_id"]
print("adding task_id")
name = task["task_name"]
app.add_periodic_task(timedelta(minutes=1),add.s(task), name = task["task_id"])
@app.task(name='scheduler_task')
def scheduler_task(data):
print(str(data["a"]+data["b"]))