我加入了KStream和KTable。下面是我的代码。
StreamsBuilder builder = new StreamsBuilder();
KTable<String, String> addressTable = builder.table(ADDRESS_TABLE);
KStream<String, String> orderStream = builder.stream(ORDER_STREAM);
orderStream.join(addressTable,
(order, address) -> order + " send to " + address)
.to(ORDER_JOIN_STREAM);
KafkaStreams streams;
streams = new KafkaStreams(builder.build(), props);
streams.start();
执行完此代码后,将创建一个新的主题。
$ ./kafka-topics.sh --zookeeper localhost:2181 --list
__consumer_offsets
order-join-application3-address-store-name-changelog
address
order
如果没有"~~~changlog"主题,我该如何执行代码。
- broker版本:0.11.0.2
- 流版本:2.7.0
我不确定这是否可能,因为
在Kafka Streams DSL中,聚合操作的输入流可以是KStream或KTable,但输出流将始终是KTable。
从Kafka聚合中提取,该聚合也使用类似于Kafka join中的Streams DSL。基本上Kafka join所做的是在KTable上执行KStream的查找;变更日志流";从这里。
KStream KTable联接始终是非窗口联接。他们允许你根据KTable(变更日志流(执行表查找接收来自KStream的新记录(记录流(。一个例子用例是用最新的用户简档信息(KTable(。