使用Spring Cloud Stream Kafka Streams和Avro输入/输出,nativeEncoding



我们正在通过支持Avro输入/输出记录的Spring Cloud Stream功能测试Kafka Streams的使用,但设置了nativeEncoding=falsenativeDecoding=false,以便在进行Avro转换时使用自定义MessageConverter

默认的序列号为StringSerde(用于键(和ByteArraySerde(用于值(。

当我们只使用KStream-KStream函数时,一切都正常,例如:

@Bean
public Function<KStream<String, DataRecordAvro>, KStream<String, DataRecordAvro>> wordsCount() {
return input -> input
.flatMapValues(value -> Arrays.asList(value.getName().toString().toLowerCase().split("\W+")))
.map((key, value) -> new KeyValue<>(value, value))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(Duration.ofSeconds(5)).grace(Duration.ofMillis(0)))
.count()
.toStream()
.map((key, value) -> new KeyValue<>(key.key(), new DataRecordAvro(key.key(), value)));
}

但当我们尝试一个更复杂的例子,涉及像这样的输入KTable时:

@Bean
public BiFunction<KStream<String, DataRecordAvro>, KTable<String, DataRecordAvro>, KStream<String, DataRecordAvro>> userClicksRegionKTableAvro() {
return (userClicksStream, usersRegionKTable) -> userClicksStream
.leftJoin(usersRegionKTable,
(clicks, region) -> new RegionWithClicks(region == null ? "UNKNOWN" : region.getName().toString(), clicks.getCount()))
.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(), regionWithClicks.getClicks()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum)
.mapValues((key, value) -> new DataRecordAvro(key, value))
.toStream();
}

(DataRecordAvro类只有两个成员:CharSequence name; Long count;(

当收到第一条记录时,抛出此异常:

ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: java.lang.String, and value: com.xxxx.kstreams.fixtures.avro.DataRecordAvro.
Note that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.

引发异常的处理器似乎是:

KSTREAM-LEFTJOIN-0000000011:
states:     [user-regions-avro-STATE-STORE-0000000008]

我们不知道为什么它在这种情况下不起作用。也许leftJoin操作将信息保存到内部主题,而useNativeEncoding/Decoding=false没有被考虑在内?但是为什么kstream->上面的kstream示例有效吗?我们认为Avro转换只在Topology的开始和结束时完成,为什么在使用leftJoin时出现这种强制转换异常?

下面是另一个工作正常的示例(没有输入Avro记录,将消费者useNativeDecoding保留为默认值true(:

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, DataRecordAvro>> userClicksRegionKTable() {
return (userClicksStream, usersRegionKTable) -> userClicksStream
.leftJoin(usersRegionKTable,
(clicks, region) -> new RegionWithClicks(region == null ? "UNKNOWN" : region, clicks))
.map((user, regionWithClicks) -> new KeyValue<>(regionWithClicks.getRegion(), regionWithClicks.getClicks()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum)
.mapValues((key, value) -> new DataRecordAvro(key, value))
.toStream();
}

请帮忙!

对于Spring Cloud Stream中的Kafka Streams绑定器,我们建议使用Serdes的本地解码/编码,除非您有充分的理由依赖消息转换方法。是什么用例迫使您在这里使用消息转换器?在实践中,在Spring Cloud Stream中的Kafka Streams应用程序中使用消息转换器进行序列化会在拓扑结构中添加一个额外的层,并使其更深入,因此建议使用本机解码/编码。

正如您所指出的,对于KTable,绑定器总是使用本地解码——目前,不可能在那里使用消息转换器。当您在KTable绑定上关闭useNativeDecoding时,绑定器会忽略它,只使用默认的字节序列号。我建议使用默认的KTable绑定,然后在应用程序配置中添加以下bean。

@Bean
public Serde< DataRecordAvro> dataRecordAvroSerde() {
// return Serde
}

通过这种方式,绑定器将检测这个bean,并意识到Serde类型与函数签名中的类型匹配,然后在这些输入中使用它。

如果您对此应用程序有进一步的问题,请随时分享MCRE。那么我们可以进一步研究。

相关内容

  • 没有找到相关文章

最新更新