根据芹菜的结果到达工人的路线



我最近一直在使用 Storm,其中包含一个名为 fields grouping 的概念(afaict 与 Celery 中的group()概念无关),其中具有特定键的消息将始终路由到同一个工作人员。

只是为了更清楚地定义我的意思,这里是来自风暴维基。

字段

分组:流按 中指定的字段进行分区 分组。例如,如果流按"用户 ID"分组 字段,具有相同"user-id"的元组将始终转到相同的任务, 但是具有不同"user-id"的元组可能会转到不同的任务。

例如,从单词列表中读取,我想将以a,b,c开头的单词路由到仅工作进程,d,e,f路由到另一个进程,等等。

想要这样做的原因可能是因为我希望一个进程负责数据库读取/写入一组相同的数据,这样进程之间就不会有竞争条件。

我正在尝试找出在 Celery 中实现这一目标的最佳方法。

到目前为止,我最好的解决方案是为每个"组"(例如 letters.a、letters.d)使用一个队列,并确保工作进程的数量与队列的数量完全匹配。缺点是它必须为每个工作线程只运行一个进程,以及各种情况,例如工人死亡或添加/删除工人时。

我是芹菜的新手,所以如果我引用的概念不正确,请纠正我。

这涉及到一些胶水,但这是概念:

有一种方法可以使用 CELERY_WORKER_DIRECT 将任务直接发送给不同的工作人员。将其设置为 True将为每个工作人员创建一个工艺路线。

我定期使用 celery.current_app.control.inspect().ping() 确定活动工作线程或确定活动主机。 例如:

>>> hosts = sorted(celery.current_app.control.inspect().ping().keys())
['host5', 'host6']

当我需要按键路由时,我会对值进行哈希处理,然后按工作器数量取模。这将均匀地分配任务,并将相同的密钥保留给同一工作人员。例如:

>>> host_id = hash('hello') % len(hosts)
1
>>> host = hosts[host_id]
'host6'

然后在执行任务时,我只需指定交换和路由密钥,如下所示:

my_task.apply_async(exchange='C.dq', routing_key=host)

有一些缺点:

  1. 据我所知,在工作线程上设置> 1 的并发性将使每个进程都消耗相同的进程,从而否定整个练习。不幸的解决方法是将其保持在 1。
  2. 如果工作线程在 ping()apply_async 之间出现故障,消息将被发送到不存在的路由。解决此问题的方法是捕获超时、重新断言可用主机、重新哈希和重新发送。

芹菜的要点是你不需要管理单个工人。

如果需要任务获取数据的所有权,则任务应在运行开始时取得所有权。

如果你想单独管理工人,可能不要使用芹菜。您可能应该自己编写工作线程,并且只使用消息队列(或者可能是风暴)。为正确的工作使用正确的工具。

相关内容

  • 没有找到相关文章

最新更新