如何避免腌制芹菜的任务



我的场景如下:我有一个大型机器学习模型,由一群工人计算。本质上,工人计算他们自己的模型部分,然后与结果交换,以保持模型的全局一致状态。

所以,每一个芹菜任务都会计算它自己的工作部分。但这意味着,任务不是无状态的,这就是我的问题:如果我说some_task.delay( 123, 456 ),实际上我在这里发送了两个整数!

我正在发送整个任务状态,它是在芹菜的某个地方腌制的。这种状态通常约为200MB:-(

我知道,在Celery中选择一个像样的序列化程序是可能的,但我的问题是NOT如何只抓取任何数据,这可能是任务中的任务。如何pickle任务的参数?以下是来自celener/app/task.py的引文:

def __reduce__(self):
# - tasks are pickled into the name of the task only, and the reciever
# - simply grabs it from the local registry.
# - in later versions the module of the task is also included,
# - and the receiving side tries to import that module so that
# - it will work even if the task has not been registered.
mod = type(self).__module__
mod = mod if mod and mod in sys.modules else None
return (_unpickle_task_v2, (self.name, mod), None)

我只是不想发生这种事。有没有一个简单的方法来解决这个问题,或者我只是被迫建造自己的芹菜(想象起来很丑陋)?

不要为此使用芹菜结果后端。使用单独的数据存储。

虽然你可以只使用Task.ignore_result,但这意味着你失去了跟踪任务状态等的能力。

最好的解决方案是使用一个存储引擎(例如Redis)作为结果后端。您应该设置一个单独的存储引擎(Redis的一个单独实例,或者类似MongoDB的东西,根据您的需要)来存储实际数据。

通过这种方式,您仍然可以看到任务的状态,但大型数据集不会影响芹菜的操作。

切换到JSON序列化程序可能会减少序列化开销,具体取决于生成的数据的格式。然而,它无法解决通过结果后端放入过多数据的根本问题。

结果后端可以处理相对少量的数据——一旦超过一定的限制,就开始阻止其主要任务的正确操作——任务状态的通信。

我建议更新您的任务,使其返回包含有用元数据的轻量级数据结构(例如,促进任务之间的协调),并将"真实"数据存储在专用存储解决方案中。

您必须按照文档中的说明定义任务的忽略结果:

任务ignore_result

不存储任务状态。请注意,这意味着您不能使用AsyncResult来检查任务是否准备好,或者获取其返回值。

这有点离谱,但仍然如此。

据我所知,这里正在发生什么。您有几个进程,它们与进程间通信并行地进行繁重的计算。所以,与其让芹菜不满意,你可以:

  • 使用zmq进行进程间通信(仅发送必要的数据)
  • 使用supervisor来管理和运行进程(尤其是numproc将有助于运行多个相同的工作程序)

虽然不需要自己编写芹菜,但需要编写一些代码。

相关内容

  • 没有找到相关文章

最新更新