我有一个名为source的主题,其中包含两种消息流,A和B。我写了一个消耗该主题的Kafka-streams应用程序,找到了A和B使用相同的相关ID并将它们汇总到新的消息C中,并将其放在输出主题目标
上有时没有B(或Wise VersA(的A会列入源主题。我已经创建了一个可查询的状态商店,因此我可以查看这些悬而未决的消息,但是现在我想从中间主题中删除一条特定的消息。我猜这只是用正确的键(我拥有的(收到消息的问题,然后将身体纳入中间话题。问题是最好的方法是什么?
- 产生一个特殊的清晰消息来来源,这将导致汇总消息变为null
- 直接用null数据将消息直接写入中间主题
- 其他方式,也许Kafka-streams已经有一个API呼吁?
奖励问题:如果我知道我不想让消息坐在中间主题中超过6个月,我可以指示Kafka-streams使用6M保留来创建中间主题,或者我应该手动创建主题我运行该应用程序?
奖励问题:如果我知道我不想让消息坐在中间主题中超过6个月,我可以指示Kafka-streams使用6M保留来创建中间主题,或者我应该手动创建主题我运行该应用程序?
是的,您可以设置保留时间:
kafka/bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my_topic --alter --add-config retention.ms=16070400000
或创建主题时:
kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 10 --if-not-exists --config retention.ms=16070400000 --topic my_topic