我们使用kafka流处理一个输入流和一个带有客户端数据的压缩主题。在我们的流处理中,我们使用第一个,并使用GlobalKTable将它与第二个连接起来,例如
StreamsBuilder builder = new StreamsBuilder();
GlobalKTable<String, Client> clients
= builder.globalTable(CLIENT_TOPIC, Consumed.with(Serdes.String(), clientSerde));
KStream<String, Foo> foos = builder.stream(FOO_TOPIC);
KStream<String, Bar> bars = foos
.leftJoin(
clients,
(streamKey, streamValue) -> streamValue.getClientId().toString(),
new FooClientJoiner()
);
这基本上可以工作,但是当然,整个CLIENT_TOPIC存储在内存中(我们使用RocksDB来存储状态,所以它在HDD上),使用非琐碎空间-这成为一个问题。
我们知道,从整个客户端数据中,我们只使用一列-所以如果我们有像
这样的内容StreamsBuilder builder = new StreamsBuilder();
// GlobalKTable from reduced topic
GlobalKTable<String, ReducedClient> reducedClients
= builder.stream(CLIENT_TOPIC)
.map((key,value) -> new KeyValue<String, String>(key, value.getTheOneColummnINeed()))
.toGlobalKTable();
KStream<String, Foo> foos = builder.stream(FOO_TOPIC);
KStream<String, Bar> bars = foos
.leftJoin(
reducedClients,
(streamKey, streamValue) -> streamValue.getClientId().toString(),
new FooReducedClientsJoiner()
);
它将解决我们的问题。从这样的流创建一个GlobalKTable是不可能的-但是有一些等效的吗?或者有什么方法可以减少缓存的CLIENT_TOPIC占用的空间,因为我们只需要一小部分数据?
我还尝试手动修改生成的Client类(以便它只包含我需要的内容),但这并不影响存储的GlobalKTable的最终大小。
顺便说一句,在RocksDB中玩压缩设置并没有多大帮助。
您可以使用Kafka Streams将单个列写入单独的主题:
reducedClients
= builder.stream(CLIENT_TOPIC)
.map((key,value) -> new KeyValue<String, String>(key, value.getTheOneColummnINeed()))
.to(REDUCED_TOPIC);
KStream<String, Foo> foos = builder.stream(FOO_TOPIC);
GlobalKTable<String, Client> clients
= builder.globalTable(REDUCED_TOPIC, Consumed.with(Serdes.String(), clientColumnSerde));
KStream<String, Bar> bars = foos
.leftJoin(
clients,
(streamKey, streamValue) -> streamValue.getClientId().toString(),
new FooReducedClientsJoiner()
);
或者,您应该考虑对两个流进行共分区,以便您可以为连接使用非全局KTable。