所以 redis 5.0 引入了一个名为 Streams 的新功能。它们似乎非常适合分发用于进程间通信的消息:
- 它们在可靠性方面超过了 PUB/SUB 事件消息传递的能力:PUB/SUB 是即发即弃的,不能保证收件人会收到消息
- Redis 列表有些低级,但仍可以使用。但是,流针对性能和上述用例进行了优化。
但是,由于此功能非常新,因此几乎没有任何Python(甚至通用的redis(手册,我真的不知道如何使流系统适应我的用例。
我想有一个发布者程序将消息推送到流并包含收件人信息(如recipient: "user1"
(。然后,我将有几个接收过程,所有进程都应该检查新的流消息并比较它们是否是目标收件人。如果是,他们应该处理邮件并将其标记为已处理(已确认(。
但是,我真的不了解消费者组,待定状态等概念。谁能给我一个真实世界的例子来说明我的小伪代码?
sender.py
db = Redis(...)
db.the_stream.add({"recipient": "user1", "task": "be a python"})
recipient.py(将有许多实例运行它们,每个实例都具有唯一的收件人 ID(
recipient_id = "user1" # you get the idea...
db = Redis(...)
while True:
message = db.the_stream.blocking_read("$") # "$" somehow means: just receive new messages
if message.recipient == recipient_id:
perform_task(message.task)
message.acknowledge() # let the stream know it was processed
else:
pass # well, do nothing here since it's not our message. Another recipient instance should do the job.```
使用您给出的示例和伪代码,让我们想象一下:
recipient.user1
每分钟收到 60 条消息perform_task()
方法需要 2 秒才能执行。
这里将发生的事情是显而易见的:新消息传入和处理之间的延迟只会随着时间的推移而增长,离"实时处理"越来越远。
system throughput = 30 messages/minute
要解决此问题,您可能需要为user1
创建一个使用者组。在这里,您可以并行运行 4 个不同的 python 进程,所有 4 个进程都加入同一组中以进行user1
。现在,当收到一条消息user1
4 名工作人员中的一名将拿起它并perform_task()
。
system throughput = 120 message/minute
在您的示例中,message.acknowledge()
实际上并不存在,因为您的流读取器是单独的(XREAD 命令(。
如果它是一个组,则消息的确认变得至关重要,这就是 redis 知道其中一个组成员确实处理了该消息的方式,因此它可能会"继续"(它可能会忘记该消息正在等待确认的事实(。使用组时,有一点服务器端逻辑来确保每条消息都传递到一个使用者组工作线程一次(XGROUPREAD 命令(。客户端完成后,它会发出该消息的确认(XACK 命令(,以便服务器端"使用者组缓冲区"可以删除它并继续。
想象一下,如果一个工人死了,从来没有承认过这个信息。对于使用者组,您可以注意这种情况(使用 XPENDING 命令(并对其执行操作,例如,重试在另一个使用者中处理相同的消息。
当您不使用组时,Redis 服务器不需要"继续","确认"变为 100% 客户端/业务逻辑。