spring-kafka修复了事件迁移实现



我有几个服务,其中一个是真相来源(SOT(。卡夫卡是他们的信息中介。我需要不时地生成一组将在其他服务中使用和应用的事件。所谓的固定事件迁移。

我的夹具文件示例:

EntityUpdated (topicA)
- id
- relation
RelationUpdated (topicB)
- id
- relation

类是在应用事件之后在数据库中具有投影的spring实体。

class Entity: Model {
id
val relation: Relation
}
class Relation: Model {
id
}

当前使用者实现以任意方式读取主题,使用者可以在topicA之前从topicB读取数据,当由于相关实体还不存在而无法应用消息时,我会得到案例。(RelationUpdated在EntityUpdated之前消耗(。

我有几个想法来解决它:

  1. 暂停所有分区/主题,然后按指定顺序继续。所以我可以避免RelationUpdated consumed before EntityUpdated的情况。然后,在恢复所有主题的所有分区后,我可以继续以任意方式工作。我不喜欢切换,但它看起来很有效。

  2. 将无法应用于所谓死信队列的消息放入队列,并尝试一次又一次地重播,直到它们全部应用为止。

也许有人做了类似的事情。我很高兴知道你的想法。

这不是Apache Kafka的设计初衷。更明确地说,信息传递是关于独立性和分离关注点的。我的意思是,一个源中的消息不应该影响其他消息,而你想用暂停和DLT做什么对其他消息真的很有影响。

我建议您研究一下Spring Integration及其聚合器模式的实现:https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#aggregator.这个想法是:你的EntityRelationid上有相关性。因此,独立于谁先到达,聚合器将等待第二部分,只有在第二部分之后,才会为下一个处理步骤释放这两个部分。有了这个集成解决方案,就不需要像消费者暂停这样的事情,也不需要通过DLT进行额外的代理回合。

如果Spring Integration对您来说太复杂了,您可以考虑使用本地Map来实现自己的解决方案,以添加基于第一个到达的(computeIfAbsent()(,并返回第二个到达(computeIfPresent()(。

最新更新