无法使用 Spring Cloud Stream 反序列化 Kafka 流中的数据



我正在创建一个简单的Kafka流应用程序。我的生产者正在生成一个主题的protobuf序列化消息,我在Kafka流应用程序中使用该主题来处理消费者消息。我试图在我的应用程序中使用valueSerde: io.confluent.kafka.streams.serdes.protobuf.KafkaProtobufSerde反序列化消息。yml文件。我得到以下错误

错误:

org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[0, 0, 0, 0, 2, 0, 10, 13, 84, 105, 109, 101, 114, 32, 109, 101, 115, 115, 97, 103, 101, 16, 1, 34, 12, 8, -126, -107, -127, -120, 6, 16, -12, -88, -117, -114, 2]] from topic [MYINPUT-TOPIC]
Caused by: java.io.CharConversionException: Invalid UTF-32 character 0x1ff0a0d (above 0x0010ffff) at char #1, byte #7)
at com.fasterxml.jackson.core.io.UTF32Reader.reportInvalid(UTF32Reader.java:195) ~[jackson-core-2.11.3.jar:2.11.3]
at com.fasterxml.jackson.core.io.UTF32Reader.read(UTF32Reader.java:158) ~[jackson-core-2.11.3.jar:2.11.3] 

application.yml配置文件:

spring:
cloud:
function:
definition: process
stream:
bindings:
process-in-0:
consumer:
max-attempts: 3
valueSerde: io.confluent.kafka.streams.serdes.protobuf.KafkaProtobufSerde
back-off-initial-interval: 100
retryable-exceptions:
javax:
validation:
ValidationException: false
destination: MYINPUT-TOPIC
group: consumer-group
concurrency: 2
kafka:
streams:
binder:
brokers: localhost:9092
schema-registry-u-r-l: http://localhost:8081
auto-offset-reset: "earliest"
configuration:
commit-interval-ms: 100

处理在日志中打印实际反序列化消息的方法:

@Component
@Slf4j
public class ProcessStream {
//Here below Timer object is Protobuf's auto-generated class, I am using it to deserialize messages. 
//I'm getting byte Aarry on this method when I'm debugging it.  
@Bean
public Consumer<KStream<String, Timer>> process() {
return (InputStream) -> {
InputStream.foreach((k,v) -> log.info(String.format("key: %s, value: %s",k, v)));
};
}
}

请帮我解决这个问题。我如何在Kafka流中使用protobuf反序列化消息?

您需要使用一个封装了适当的protobuf反序列化器的protobuf Serde。Confluent模式注册表提供了一个protobuf Serde实现。有关详细信息,请参见此。我还没有测试过这个特定的实现,但看起来应该可以工作。如果您正在使用它(或自定义的protobuf server实现),那么只需提供该类型的bean,就可以在Spring Cloud Stream应用程序中注册它。见下文.

@Bean
public KafkaProtobufSerde<Timer> kafkaProtobufSerde() {
}

Spring Cloud Stream中的Kafka Streams绑定器将检测此bean并与您的消费者类型匹配。

基于下面注释中共享的示例应用程序进行更新

在配置中进行以下更改后,我可以运行示例应用程序:

spring:
cloud:
stream:
function:
definition: process
bindings:
process-in-0:
group: consumer-group
concurrency: 1
headerMode: none
destination: MESSAGING-TIMER-EXAMPLE
kafka:
streams:
binder:
configuration:
schema.registry.url: http://localhost:8081

您需要kafka.streams.binder前缀。在进行更改之后,我能够启动应用程序而不会出现任何错误。我在启动日志中看到这些。

2021-07-29 15:10:42.995  INFO 65137 --- [           main] .c.s.b.k.s.KafkaStreamsFunctionProcessor : Key Serde used for process-in-0: org.apache.kafka.common.serialization.Serdes$StringSerde
2021-07-29 15:10:42.996  INFO 65137 --- [           main] .c.s.b.k.s.KafkaStreamsFunctionProcessor : Value Serde used for process-in-0: io.confluent.kafka.streams.serdes.protobuf.KafkaProtobufSerde
...
...
2021-07-29 15:10:43.629  INFO 65137 --- [           main] c.j.ListenerMessagingStreamsApplication  : Started ListenerMessagingStreamsApplication in 1.836 seconds (JVM running for 2.305)

我注意到您将yaml配置文件命名为application-LOCAL.yml。如果这是您正在使用的,那么请确保在运行时设置此属性--spring.config.name=application-LOCAL。否则,将文件重命名为application.yml

最新更新