我很难为webhook调度系统找到一个可靠且可扩展的解决方案。
当前系统使用带有webhook队列的RabbitMQ
(我们称之为events
),这些webhook被消耗和调度。这个系统工作了一段时间,但现在有一些问题:
- 如果系统用户生成的事件太多,它将占用队列,导致其他用户长时间无法接收到webhook
- 如果我将所有事件划分为多个队列(通过URL哈希),这会降低出现第一个问题的可能性,但当非常繁忙的用户访问同一个队列时,这种情况仍会不时发生
- 如果我试图将每个URL放入其自己的队列中,那么挑战就是动态地创建/分配消费者到这些队列中。就
RabbitMQ
文档而言,API在筛选非空队列或未分配消费者的队列方面非常有限 - 就
Kafka
而言,正如我从阅读它的所有内容中了解到的那样,在单个分区的范围内情况是相同的
那么,问题是-是否有更好的方法/系统来实现这一目的?也许我错过了一个非常简单的解决方案,可以让一个用户不干扰另一个用户?
提前感谢!
您可以尝试几个rabbitmq功能来缓解您的问题(不需要完全删除):
-
使用公共随机交换在多个队列中拆分事件。它将减轻事件的大高峰,并将工作分派给几个消费者。
-
为队列设置一些TTL策略。这样,如果处理速度不够快,Rabbitmq可能会将事件重新发布到另一组队列(例如,通过另一个私有随机交换)。
你可能有几个";循环";事件的数量、变化的配置(即每个周期的周期数和TTL值)。您的第一个周期尽可能地处理新事件,在随机交换下通过几个队列来减少峰值。如果它无法足够快地处理事件,则会将事件转移到另一个具有专用队列和消费者的周期。
通过这种方式,您可以确保新事件有更好的更改可以快速处理,因为它们总是在第一个周期中发布(而不是在另一个用户的一堆旧事件之后)。
如果您需要订单,那么您必须依赖用户输入。
但在卡夫卡的世界里,有几件事值得一提;
- 您可以使用
Transactions
实现exactly-once
交付,这允许您构建类似于常规AMQP的系统 - Kafka支持按键分区。这允许您保持相同密钥(在您的情况下为userId)的处理顺序
- 可以通过调整所有生产者、服务器和消费者端(批量大小、空中请求等)来提高吞吐量。有关更多参数,请参阅Kafka文档)
- Kafka支持消息压缩,这减少了网络流量并增加了吞吐量(只是为LZ4等快速压缩算法消耗了更多的CPU功率)
分区在您的场景中是最重要的。您可以增加分区以在同一时间处理更多的消息。您的消费者可以和同一消费者gorup中的分区一样多。即使您在达到分区数后进行扩展,您的新使用者也将无法读取,并且将保持未分配状态。
与常规AMQP服务不同,Kafka在您阅读后不会删除消息,只是标记consumer-gorup-id的偏移量。这可以让你同时做一些事情。就像在单独的过程中计算实时用户计数一样。
所以,我不确定这是否是解决这个问题的正确方法,但这就是我想到的。
先决条件:带有重复数据消除插件的RabbitMQ
所以我的解决方案包括:
g:events
队列-让我们称之为parent
队列。此队列将包含需要处理的所有child
队列的名称。也许它可以被其他一些机制(比如Redis排序的Set之类的)取代,但那时你必须自己实现ack逻辑g:events:<url>
-有child
队列。每个队列只包含需要发送到该url
的事件
将webhook有效负载发布到RabbitMQ时,将实际数据发布到child
队列,然后将child
队列的名称附加到parent
队列。重复数据消除插件不允许同一个child
队列发布两次,这意味着只有一个消费者可以接收该child
队列进行处理。
所有使用者都在使用parent
队列,在收到消息后,他们开始使用消息中指定的child
队列。在child
队列为空之后,您确认parent
消息并继续前进。
该方法允许对哪些child
队列进行非常精细的控制。如果某个child
队列占用了太多时间,只需ack
parent
消息,并将相同的数据重新发布到parent
队列的末尾。
我知道这可能不是最有效的方法(不断向parent
队列发布也会有一些开销),但它就是这样