事件驱动的发布-订阅模型



我正在处理一个需求,即流程(比如生产者)需要向可变数量的流程(比如消费者)发送单向消息。

发布-订阅模型似乎对此很好,因为消费者将订阅来自生产者的消息。我尝试使用ZeroMQ来实现这一点。

然而,我有一些问题:

  1. 消费者必须不断地轮询信息。当有新消息时,我会通知消费者。

  2. 生产者队列有可能被填满。我希望生产者根据某些条件从队列中删除消息(比如删除超过5秒的消息,或者删除已读取5次的消息)。

  3. 由于消费者正在轮询,并且消息不会从队列中删除,因此消费者在收到新消息之前会看到重复的消息。我希望每个新消息只通知消费者一次。

我知道我可能使用了错误的模型(发布-订阅可能不合适)。我曾考虑过使用请求回复,但这不起作用,因为生产者不想跟踪消费者的数量。

有人能提出一个好的替代方案吗?

我建议在Producer和Consumer之间使用Broker的Push-Pull模型。

  1. 任何新消息都应通知Broker
  2. 消费者将听取代理的通知(保留表以跟踪成功/失败。因此,重试时将避免重复)
  3. 一旦#2完成,消费者就可以从生产者(源)中提取数据,并将ack发送给代理以确定成功/失败

希望这能帮助

DDS(数据分发服务)中间件支持您想要实现的目标,而且更容易实现。

直接回答您的问题:

  1. DDS支持侦听器机制,您的订阅者不需要持续轮询。

  2. DDS具有良好的QoS设置,可以防止发布服务器队列被填满。您可以使用History QoS表示"只保留队列中最近的10个样本",也可以使用Lifespan QoS表示"仅保留最后10秒内发布的样本"。

  3. 同样,您可以使用DDS侦听器机制,并且对于每个新样本,您将只收到一次通知。无需投票。

目前有两种开源实现。

  • OpenSplice:https://github.com/OpenSplice/opensplice
  • OpenDDShttp://www.opendds.org/

您需要多个生产商吗?如果没有,您可以使用PUSH/PULL而不是PUB/SUB。

使用PUSH/PULL,您可以拥有任意数量的消费者(他们是模型的PULL侧)。写入PUSH端点的所有消息都以循环方式在所有连接的消费者之间进行分发。这也确保了两个消费者不会收到相同的消息。

正如您所描述的,如果两个或多个消费者订阅了相同的"前缀",那么以消费者作为SUB端点,您最终可能会向多个消费者传递相同的消息(假设这在您的模型中会是一个问题)。

假设"prefix"是传递给sock.setsockopt(ZMQ_SUBSCRIBE, "prefix", ...); 的字符串

尝试使用JMS提供程序或AMQP提供程序。这些有一些你正在寻找的主题:

  1. 向订阅者推送通知。

  2. 消息的生存时间属性,如果消息未在TTL内使用,则允许删除消息或将其放入死信队列。

  3. 一次性通知-取决于您的配置。

请注意,一旦发生网络故障,可能导致消息丢失或重复的消息,只有消息才具有边缘条件。。。你选择。

就使用哪种供应商而言。RabbitMQ在AMQP中很受欢迎。对于JMS,有任意数量的专有产品或开源实现。

最新更新