卡夫卡从一个特定的主题流式传输KTable创作



我正在做一个项目,我被KTable卡住了。

我想从一个主题中获取记录,并将它们放在KTable(存储(中,这样我就有了1个记录作为1个键。

static KafkaStreams streams;
final Serde<Long> longSerde = Serdes.Long();
final Serde<byte[]> byteSerde = Serdes.ByteArray();
static String topicName;
static String storeName;
final StreamsBuilder builder = new StreamsBuilder();
KStream<Long, byte[]> streamed = builder.stream(topicName, Consumed.with(longSerde, byteSerde));
KTable<Long, byte[]> records = streamed.groupByKey().reduce(
new Reducer<Long>() {
@Override
public Long apply(Long aggValue, Long newValue) {
return newValue;
}
}, 
storeName);

这是我认为最接近答案的一次。

您的方法是正确的,但您需要使用正确的serdes。

.reduce((函数中,值类型应为byte[]

KStream<Long, byte[]> streamed = builder.stream(topicName, Consumed.with(longSerde, byteSerde));
KTable<Long, byte[]> records = streamed.groupByKey().reduce(
new Reducer<byte[]>() {
@Override
public byte[] apply(byte[] aggValue, byte[] newValue) {
return newValue;
}
}, 
Materialized.as(storename).with(longSerde,byteSerde));

相关内容

  • 没有找到相关文章

最新更新