尝试使用Kafka绑定器从云流中读取CloudEvents的批处理消息。如果我将任何自定义类与自定义序列化程序/反序列化程序一起使用,它可以正常工作,但与cloudevents一起使用时,消息不会到来。
spring
cloud:
function.definition: consumer
stream:
bindings:
producer-out-0:
destination: audit
group: audit-producer
producer:
useNativeEncoding: true
consumer-in-0:
destination: audit
group: audit-consumer
consumer:
batch-mode: true
useNativeDecoding: true
kafka:
binder:
brokers: localhost:9092
consumer-properties:
max.poll.records: 5
fetch.min.bytes: 10000
fetch.max.wait.ms: 10000
bindings:
producer-out-0:
producer:
configuration:
cloudevents:
serializer:
encoding: STRUCTURED
event_format: application/cloudevents+json
key.serializer: org.apache.kafka.common.serialization.StringSerializer
# value.serializer: com.sagar.audit.watcher.domain.MessageSerializer
value.serializer: io.cloudevents.kafka.CloudEventSerializer
consumer-in-0:
consumer:
configuration:
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
# value.deserializer: com.sagar.audit.watcher.domain.MessageDeserializer
value.deserializer: io.cloudevents.kafka.CloudEventDeserializer
和消费者我尝试了两个列表<gt;和单个
@Bean
public Consumer<List<CloudEvent>> consumer() {
System.out.println("inside consumer");
//return auditMessage -> System.out.println("data at loop--" + thread + " -- " + auditMessage);
return s -> s.forEach(auditMessage -> System.out.println("data at loop--" + thread + " -- " + auditMessage));
}
如果我只是使用Consumer,我会得到以下错误,这意味着反序列化正在进行,但不知何故,消息并没有到达Consumer。
2022-10-26 20:31:24.070 WARN [,8289fada18f22581,831ea94d13ef311e] 64368 --- [container-0-C-1] s.c.f.c.c.SmartCompositeMessageConverter : Failure during type conversion by org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter@3bf97caf. Will try the next converter.
org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Cannot construct instance of `io.cloudevents.CloudEvent` (no Creators, like default constructor, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
at [Source: (String)"[CloudEvent{id='hello', source=http://localhost, type='example.kafka', datacontenttype='application/json', data=JsonCloudEventData{node={"id":null,"name":"sagar-cloud-1"}}, extensions={}}, CloudEvent{id='hello', source=http://localhost, type='example.kafka', datacontenttype='application/json', data=JsonCloudEventData{node={"id":null,"name":"sagar-cloud-2"}}, extensions={}}]"; line: 1, column: 1]; nested exception is com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `io.cloudevents.CloudEvent` (no Creators, like default constructor, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
at [Source: (String)"[CloudEvent{id='hello', source=http://localhost, type='example.kafka', datacontenttype='application/json', data=JsonCloudEventData{node={"id":null,"name":"sagar-cloud-1"}}, extensions={}}, CloudEvent{id='hello', source=http://localhost, type='example.kafka', datacontenttype='application/json', data=JsonCloudEventData{node={"id":null,"name":"sagar-cloud-2"}}, extensions={}}]"; line: 1, column: 1]
at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:237) ~[spring-messaging-5.3.23.jar:5.3.23]
at org.springframework.cloud.stream.converter.ApplicationJsonMessageMarshallingConverter.convertFromInternal(ApplicationJsonMessageMarshallingConverter.java:115) ~[spring-cloud-stream-3.2.5.jar:3.2.5]
at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:185) ~[spring-messaging-5.3.23.jar:5.3.23]
at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:176) ~[spring-messaging-5.3.23.jar:5.3.23]
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `io.cloudevents.CloudEvent` (no Creators, like default constructor, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information
at [Source: (String)"[CloudEvent{id='hello', source=http://localhost, type='example.kafka', datacontenttype='application/json', data=JsonCloudEventData{node={"id":null,"name":"sagar-cloud-1"}}, extensions={}}, CloudEvent{id='hello', source=http://localhost, type='example.kafka', datacontenttype='application/json', data=JsonCloudEventData{node={"id":null,"name":"sagar-cloud-2"}}, extensions={}}]"; line: 1, column: 1]
at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:67) ~[jackson-databind-2.13.4.2.jar:2.13.4.2]
您似乎正在使用Cloud Events SDK,不确定您从中获得了什么价值,因为没有它您可以做任何事情(甚至更多(。以下是我讨论它的两篇文章:
https://spring.io/blog/2020/12/10/cloud-events-and-spring-part-1https://spring.io/blog/2020/12/23/cloud-events-and-spring-part-2
无论如何,如果你仍然想依赖CloudEvent这样的SDK类型,你可能会缺少依赖项:
PD_6此外,我们还有基于SDK的示例,因此可能会有所帮助。https://github.com/spring-cloud/spring-cloud-function/blob/main/spring-cloud-function-samples/function-sample-cloudevent-sdk/pom.xml
否则,你能做的最好的事情就是创建一个复制问题的小应用程序,将其推送到github并发送一个链接,这样我们就可以看到它了。