我有一个Flask应用程序,我使用芹菜(v5.1.2)与SQLAlchemy运行后台任务与我的工人。
这是我如何创建我的芹菜工作器…
backend_url = 'mysql://{}:{}@{}/{}'.format(username, password, hostname, db_name)
broker_url = 'sqla+' + backend_url
db_backend_url = 'db+' + backend_url
celery = Celery(
app.import_name,
backend=backend_url,
broker_url=broker_url,
result_backend=db_backend_url,
cache_backend=db_backend_url,
task_serializer='json',
result_serializer='json',
include=['my_blueprints.profile'])
可以很好地初始化和运行我的任务。当我尝试使用
读取结果的状态时,就会出现问题:bg_task = current_app.celery.AsyncResult(MY_TASK_ID)
bg_task_state = bg_task.state
我得到错误:_pickle.UnpicklingError: pickle data was truncated
时,试图运行bg_task.state
我认为这与任务返回1MB的大文件有关。当任务运行时,可以使用上面的两行代码成功读取任务状态,但在任务完成时失败。
task_serializer和result_serializer都设置为'json'
,所以我不明白为什么会发生这种情况。
在默认后端,芹菜使用BLOB
列将结果存储在celery_taskmeta
表中,该表在mysql中限制为64K。在你的芹菜日志的某个地方,你可能也看到一个截断警告从mysql当结果被写入到那个表。
芹菜的结果并不是用来传递大文件的,而更多的是为了保存一些关于任务结果的最小细节。
你没有给出很多关于你的用例的细节,但一般来说,写这么大的二进制blob到你的数据库是一种气味,或者至少是等待某一天发生的头痛。
一个不错的解决方法是将文件写入文件系统或将其上传到您喜欢的弹性存储区域,然后在任务的结果中返回文件名。这里的问题是要确保您的工作节点能够访问与需要结果的节点相同的文件系统。
您也可以通过修改celery_taskmeta
表来允许在表中存储MEDIUMBLOB
或LONGBLOB
,但我的直觉是,您最终会希望无论如何都不要将文件存储在RDBMS中。您必须确保每次从头部署应用程序时都进行了更改。