我有一个与KTable相关的实现,并使用CloudEvents来生成事件,但由于一些未知的原因,KTable生成的事件不是基于CloudEvent格式化的。实现如下:
public void initKafkaStream() {
StreamsBuilder streamsBuilder = new StreamsBuilder();
PojoCloudEventDataMapper<TicketEvent> ticketEventMapper = PojoCloudEventDataMapper.from(objectMapper, TicketEvent.class);
KStream<String, CloudEvent> rawTicketStream = streamsBuilder.stream(rawTicketEvent, Consumed.with(Serdes.String(), cloudEventSerde));
rawTicketStream
.mapValues(e -> convertToPojo(e, TicketEventMapper))
.filter((k, v) -> v != null)
.groupByKey()
.aggregate(
AggregatedTicketEvent::new,
(key, val, agg) -> doAggregation(agg, val),
Materialized
.<String, AggregatedTicketEvent, KeyValueStore<Bytes, byte[]>>as("aggregatedTicket")
.withValueSerde(aggregatedTicketEventSerde)
.withLoggingDisabled()
)
.mapValues(result -> {
try {
return CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withType("ticket_update")
.withSource(sourceTemplate.expand(result.getCurrent().getId()))
.withTime(result.getMeta().getOccurredAt())
.withData(objectMapper.writeValueAsBytes(result))
.withDataContentType("application/json")
.build();
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
})
.toStream()
.to(aggregatedTicketEvent, Produced.with(Serdes.String(), cloudEventSerde));
streams = new KafkaStreams(streamsBuilder.build(streamsConfig), streamsConfig);
streams.setUncaughtExceptionHandler(ex -> StreamThreadExceptionResponse.REPLACE_THREAD);
streams.start();
}
有人有这样的问题吗?
提前感谢
问题是props中的配置已从序列化程序/反序列化程序中的Kafka流中覆盖,并且默认情况下将格式设置为Encoding.BINARY。当编码为BINARY时,CloudEvents格式仅存在于标头中,而不存在于有效负载中。为了确保序列化程序具有正确的配置,我在CloudEventSerializer
和CloudEventDeserializer
中添加了它们。在这种情况下,Serdes.serdeFrom()
将如下所示:
Map<String, Object> ceSerializerConfigs = new HashMap<>();
ceSerializerConfigs.put(ENCODING_CONFIG, Encoding.STRUCTURED);
ceSerializerConfigs.put(EVENT_FORMAT_CONFIG, JsonFormat.CONTENT_TYPE);
CloudEventSerializer serializer = new CloudEventSerializer();
serializer.configure(ceSerializerConfigs, false);
CloudEventDeserializer deserializer = new CloudEventDeserializer();
deserializer.configure(ceSerializerConfigs, false);
this.cloudEventSerde = Serdes.serdeFrom(serializer, deserializer);
为了在JSON负载中获得云事件格式,我们必须使用Encoding.StructureED和JSON内容类型,这将发挥神奇的作用,并在负载中获得结果。
希望这能帮助到那些在这个问题上挣扎的人!
最佳,