我正在试验一个使用 Redis 作为代理的 Celery 工人。
这是我对芹菜工人的测试代码:
from celery import Celery
app = Celery('tasks', broker='redis://xxxxx.net:6379/0')
@app.task
def nextexec(payload):
print(payload)
使用redis-cli
,我正在运行以下命令将值插入celery
队列(由 Celery 自动创建(
RPUSH celery somekey 'somevalue'
但是我的工作线程在执行查询时系统地崩溃,我得到了一个Unrecoverable error: JSONDecodeError
.它似乎接收None
而不是要解码的 JSON 字符串。
知道我需要执行什么 Redis 查询或我需要对这个(但很简单(工作线程脚本进行哪些更改吗?
Celery 使用序列化程序在客户端和工作线程之间传输数据。每条消息都需要序列化,并且都有一个描述用于对其进行编码的序列化方法的content_type标头。
下面是使用 json 序列化的示例消息。
{'body': 'W1sxXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIjogbnVsbCwgImNoYWluIjogbnVsbCwgImNob3JkIjogbnVsbH1d',
'content-encoding': 'utf-8',
'content-type': 'application/json',
'headers': {'argsrepr': '(1,)',
'eta': None,
'expires': None,
'group': None,
'id': '5ce9a8d8-41d7-47a4-9074-beedabd88dcc',
'kwargsrepr': '{}',
'lang': 'py',
'origin': 'gen5339@pavilion',
'parent_id': None,
'retries': 0,
'root_id': '5ce9a8d8-41d7-47a4-9074-beedabd88dcc',
'task': 't.wait',
'timelimit': [None, None]},
'properties': {'body_encoding': 'base64',
'correlation_id': '5ce9a8d8-41d7-47a4-9074-beedabd88dcc',
'delivery_info': {'exchange': '', 'routing_key': 'celery'},
'delivery_mode': 2,
'delivery_tag': '0177eb65-344e-4b1c-ab5f-8e2f5d75b8d3',
'priority': 0,
'reply_to': 'a5c611b8-18b3-3bbb-b598-c3757f06c4fd'}}
芹菜工人需要接收指定格式的消息。你不能把一些值推送给经纪人(redis(并期望芹菜执行它。
使用 python 将任务排队。
from mymodule import nextexec
payload ='some payload'
nextexec.delay(payload)