我有一个kafka主题HrEvents
,其中包含许多Hire
、Fire
、Quit
、Promotion
和Demotion
消息。每个HR事件消息具有employee_id
属性(也是用于分区的密钥(和data
属性,它们可以包含关于HR事件的任意细节。
问题是,我的应用程序需要处理的各种data
Blob没有得到很好的记录,而且在任何时候,都有可能消耗应用程序无法处理的HR事件
对于每个employee_id
,应用程序都要按顺序处理所有HR事件,这一点非常重要。同样重要的是,在影响一个employee_id
的此类处理失败之后,所有其他employee_id
的所有HR事件都可以继续。
失败的HR事件以及同一employee_id
的所有后续HR事件应发布到死信队列中。一旦修补了应用程序,并添加了对另一种未记录形式的data
blob的支持,就可以从死信队列中使用这些HR事件。
我意识到,这还需要在消费者中维护某种形式的密钥黑名单,其中存储了employee_id
,死信队列中至少有一条未使用的HR事件消息。
是否有现有的解决方案/java库允许我实现这个问题的解决方案?
请原谅我的无知,但我正在努力寻找上述问题的解决方案,但我怀疑我可能没有用正确的行话进行搜索。请随意教育我。
听起来你应该能够利用Kafka Streams来实现这一点。
你的死信队列可以建立一个KTable,它形成了一种黑名单。当新事件出现在原始主题中时,您将针对现有ID的KTable执行查找,并将传入事件附加到该ID 尚未处理的事件的值列表中