我想将一个项目分解为小型微服务。我一直在考虑不同的工具,比如gRPC,但我认为芹菜可能对我来说是一个更好的选择。如果我错了,芹菜不是一个好的选择,请告诉我为什么。
我到底想做什么
例如,我有三个不同的服务器,A、B和C;A";必须能够向";B"B";一定一直在听我做了什么
- 我创建了两个RabbitMQ服务器;A"";B";另一个用于";B";以及";C">
- 我使用send_task将任务发送到服务器。一个任务是从";A";至";B"B";已经在侦听并等待来自"的请求;A";。"上还有另一个send_task;B";它发送一个请求到";C"C";也已经在运行并等待来自"的请求;B">
- 我使用backend='rpc'返回结果
当一个请求由";A"B";接收该请求并发送_ task到"0";C"C";也可以毫无问题地接收请求。问题是返回结果。我尝试了不同的方法,但大部分都是runtimeError: Never call result.get() within a task Celery
错误。
有人有解决方案吗
此特定错误表明在任务中使用result.get()
是一种可能导致死锁的不良做法。
如果你知道自己在做什么,你可以尝试allow_join_result,但这可能会导致性能问题:
with allow_join_result():
task.get()
您也可以等待任务成功:
timeout = 0.1
while not result.ready():
time.sleep(timeout)
ret = result.get()
如果我是你,我会重新思考整件事。
首先,我认为没有理由使用多个RabbitMQ服务器。相反,您的员工将订阅三个不同的队列-A、B和C(我会给他们起更有意义的名字(。需要在您称之为"服务器"的服务器上执行的任务;A";只需发送到A队列,其他所有队列也是如此。在服务器A上运行的Celery工作程序仅订阅队列A(celery ... worker -n node_A -Q A
(。类似的情况是工作人员在服务器B和C上运行。
一旦你有了这种基础设施,你就可以开始重构你编排这些任务的方式。当你发现自己需要在任务中调用.get((时,你可能会立即认为自己做错了。正确的方法是使用Celery工作流,但它需要我们进行某种心理调整。-我是通过艰苦的方式学会的。。。这样想吧,与其强制性地说你需要执行什么,不如告诉芹菜运行各种任务,当你需要用结果做一些事情时,你可以在另一个任务中收集它们,等等。实际上,大多数工作流都会有一个最终任务,它收集作为工作流一部分执行的所有其他任务的结果,并对结果进行最终处理,所以没有显式调用.get((。这也使得易于分发(和扩展(成为可能。
这里我回答我自己的问题,就我而言,这个答案不起作用。所以我读了更多,我更新了如下代码:
timeout = 0.1
while not ML_task.ready():
time.sleep(timeout)
result = ML_task.get(disable_sync_subtasks=False)
希望能有所帮助。