我们正在构建一个kafka流应用程序,作为大型微服务架构的一部分。我们希望对向后不兼容的格式更改具有弹性,并引入了隔离主题。我们找不到库提供的任何内容,所以我们只需"手动"尝试反序列化记录,并在失败时将其转发到隔离主题,就可以滚动自己的记录。
很简单。
现在是隔离事件的回放。这应该在外部触发(比如REST调用(,如果反序列化成功,则将事件转移到下一个主题。我们能利用kafka流来执行这样的按需操作吗?直观地说,它应该像builder.stream(quarantined).to(nextTopic)
一样简单。
看看处理器API,似乎不可能停止处理。模糊阻止不是一种选择,因为这会影响在同一StreamThread
中运行的其他任务,而拥有另一个KafkaStream应用程序似乎有些过头了。我想避免手动编码消费者->生产者循环,所以我也在考虑akka流kafka,但这听起来也有点过头了。。。
有什么想法吗?
如果我正确理解你的问题:每当触发外部REST调用时,你都想启动一个单独的流应用程序来读取隔离的主题B,尝试用一些更新的格式反序列化数据,如果成功,将其推送到"好数据"主题C,当它到达主题B的末尾时,这个流应用程序应该会自动停止。
在这种情况下,假设您对最终主题C没有排序要求,您可以在内部使用"停止标志",KafkaStreams调用线程可以阻止并等待该标志,而KafkaStream内部流线程可以设置为取消阻止调用线程,以最终调用"KafkaStreams.close(("。例如,您可以利用标点符号函数,该函数检查自上一个标点符号周期以来是否没有新数据,这表明我们可能已经用完了主题B中的所有数据,在这种情况下,设置标志。
在Streams自己的基准测试代码中可以找到一个示例:https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java#L657-L673,但请注意,它不是基于标点符号,而是基于处理逻辑,检查当前处理的数据内容,因为它确切地知道"最后一条记录"是什么样子。但使用这种关闭锁存器的总体想法是相同的。