春云流函数不会使用阿夫罗反串器



我有一个Spring Cloud Stream应用程序,我正在尝试利用功能接口。我只使用Strings就得到了概念验证,但现在我正在尝试处理Avro生成的消息,我无法使反序列化工作。我看过类似的问题,但他们要么不与Avro合作,要么让Avro工作,我尝试使用他们的配置都没有成功。

以下是代码和配置。我尝试了各种不同的yml条目,但没有任何运气,甚至尝试创建自己的反序列化程序。

public class StreamFunctions {
@Bean
public Function<KStream<String, MyAvroInputClass>, KStream<String, MyAvroOutputClass>> myConsumer() {
return input -> {
log.error("Received event");
// ignore the message
log.error("Ignoring event");
return input.map((k, v) -> new KeyValue<>(null, null));
};
}
}

Application.yml

---
spring:
application:
name: functional-streams
main:
banner-mode: off
cloud:
function:
definition: myConsumer
stream:
default-binder: kafka
bindings:
myConsumer-in-0:
destination: input-topic
content-type: application/*+avro
consumer:
use-native-decoding: true
auto-startup: true
myConsumer-out-0:
content-type: application/*+avro
destination: output-topic
producer:
use-native-encoding: true
kafka:
streams:
binder:
applicationId: ${info.app.name}
configuration:
commit.interval.ms: 100
schema.registry.url: http://localhost:9091
binder:
producer-properties:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
schema.registry.url: http://localhost:9092
consumer-properties:
key.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
schema.registry.url: http://localhost:9092
specific.avro.reader: true
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class:  io.confluent.kafka.serializers.KafkaAvroDeserializer
configuration:
value:
deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
spring:
deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer

渐变文件


implementation("org.springframework.boot:spring-boot-starter")
implementation("org.springframework.boot:spring-boot-starter-web")
implementation("org.springframework.boot:spring-boot-starter-webflux")
implementation("org.springframework.boot:spring-boot-starter-aop")
implementation("org.springframework.boot:spring-boot-starter-actuator")
implementation("org.springframework.boot:spring-boot-starter-log4j2")
implementation("org.apache.avro:avro:1.11.0")
implementation("io.confluent:kafka-avro-serializer:7.1.1")
implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka:3.2.4")
// https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-stream-binder-kafka-streams
implementation ("org.springframework.cloud:spring-cloud-stream-binder-kafka-streams:3.2.4")
// https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-stream-schema
testImplementation("org.springframework.cloud:spring-cloud-stream-schema:2.2.1.RELEASE")
//    implementation("org.springframework.cloud:spring-cloud-schema-registry-client")// https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-function-context
implementation("org.springframework.cloud:spring-cloud-function-context:3.2.6")
implementation("com.fasterxml.jackson.core:jackson-core:2.13.3")
implementation("com.fasterxml.jackson.core:jackson-annotations")
implementation("com.fasterxml.jackson.dataformat:jackson-dataformat-yaml")
implementation("com.fasterxml.jackson.core:jackson-databind")
implementation("org.springdoc:springdoc-openapi-ui:1.6.11")

错误结果:

2022-09-06 15:09:39,987 [ERROR] [core-cls-sync-advertising-df1651cc-0ab8-47b5-b7b9-fd307b6a6d8e-StreamThread-1] LogAndFailExceptionHandler - Exception caught during Deserialization, taskId: 0_0, topic: inbound-topic, partition: 0, offset: 3 {}
org.apache.kafka.common.errors.SerializationException: Can't deserialize data [[0, 0, 0, 0, 6, 0, 0, 0, 0, 2, 8, 74, 111, 104, 110, 2, 6, 68, 111, 101, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]] from topic [inbound-topic]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:588) ~[spring-kafka-2.8.8.jar:2.8.8]

我假设我的yml文件有问题,但我不知道出了什么问题

我认为这与一些配置问题有关。请将您的配置与此示例应用程序进行比较:https://github.com/spring-cloud/spring-cloud-stream-samples/tree/main/kafka-streams-samples/kafka-streams-avro

看看这里的配置。

最新更新