如何在卡夫卡中同步多个日志?



>假设我有 2 种类型的日志,它们有一个公共字段"uid",如果包含 uid 的这两个日志的日志到达,我想输出日志,就像连接一样,Kafka 可以吗?

是的,绝对是。查看Kafka Streams,特别是DSL API。它像这样:

StreamsBuilder builder = new StreamsBuilder();
KStream<byte[], Foo> fooStream = builder.stream("foo");
KStream<byte[], Bar> barStream = builder.stream("bar");
fooStream.join(barStream,
(foo, bar) -> {
foo.baz = bar.baz;
return foo;
},
JoinWindows.of(1000))
.to("buzz");

这个简单的应用程序使用两个输入主题("foo"和"bar"),将它们连接并写入主题"buzz"。由于流是无限的,因此在加入两个流时,您需要指定一个加入窗口(以上为 1000 毫秒),这是相应流上两条消息之间的相对时间差,以使它们有资格加入。

下面是一个更完整的示例:https://github.com/confluentinc/kafka-streams-examples/blob/4.0.0-post/src/main/java/io/confluent/examples/streams/PageViewRegionLambdaExample.java

这是文档:https://docs.confluent.io/current/streams/developer-guide/dsl-api.html。您会发现可以执行许多不同类型的联接:

  • https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/kstream/KStream.html
  • https://kafka.apache.org/10/javadoc/org/apache/kafka/streams/kstream/KTable.html

请务必注意,尽管上述示例将确定性地同步流(如果重置并重新处理拓扑,则每次都会获得相同的结果),但并非 Kafka 流中的所有联接操作都是确定性的。从版本 1.0.0 及更早版本开始,大约一半不是确定性的,可能取决于从底层主题分区消耗的数据顺序。具体而言,内部KStream(KStream和所有KTable-KTable联接是确定性的。其他联接(如所有KStream联接(KTable联接和左/外KStream联接)KStream联接是不确定的,取决于使用者使用的数据顺序。如果要将拓扑设计为可重新处理,请记住这一点。如果使用这些非确定性操作,则当拓扑实时运行时,事件到达时的顺序将产生一个结果,但如果重新处理拓扑,则可能会得到另一个结果。另请注意,像KStream#merge()这样的操作也不会产生确定性的结果。有关此问题的详细信息,请参阅为什么我的 Kafka 流拓扑无法正确重播/重新处理?和这个邮件列表帖子

相关内容

  • 没有找到相关文章

最新更新