如何在没有'changelog topic'的情况下加入KStream和KTable。



我加入了KStream和KTable。下面是我的代码。

StreamsBuilder builder = new StreamsBuilder();
KTable<String, String> addressTable = builder.table(ADDRESS_TABLE);
KStream<String, String> orderStream = builder.stream(ORDER_STREAM);
orderStream.join(addressTable, 
(order, address) -> order + " send to " + address)
.to(ORDER_JOIN_STREAM);
KafkaStreams streams;
streams = new KafkaStreams(builder.build(), props);
streams.start();

执行完此代码后,将创建一个新的主题。

$ ./kafka-topics.sh --zookeeper localhost:2181 --list
__consumer_offsets
order-join-application3-address-store-name-changelog
address
order

如果没有"~~~changlog"主题,我该如何执行代码。

  • broker版本:0.11.0.2
  • 流版本:2.7.0

我不确定这是否可能,因为

在Kafka Streams DSL中,聚合操作的输入流可以是KStream或KTable,但输出流将始终是KTable。

从Kafka聚合中提取,该聚合也使用类似于Kafka join中的Streams DSL。基本上Kafka join所做的是在KTable上执行KStream的查找;变更日志流";从这里。

KStream KTable联接始终是非窗口联接。他们允许你根据KTable(变更日志流(执行表查找接收来自KStream的新记录(记录流(。一个例子用例是用最新的用户简档信息(KTable(。

相关内容

  • 没有找到相关文章

最新更新