我已经通过处理器API的transform((或process((方法提到了几乎所有关于KStream上日志偏移的问题,就像这里的许多问题中提到的那样-
如何在KStream 中获取偏移值
但是我不能得到答案,所以我问这个问题。
每次消息被流消耗时,我想记录分区、consumer-group-id和偏移量,我不知道如何将process((或transform((方法与ProcessorContext API集成?如果我在CustomParser类中实现Processor接口,那么我必须实现所有的方法,但我不确定这是否有效,就像Record Meta Data的合并文档中提到的那样https://docs.confluent.io/current/streams/developer-guide/processor-api.html#streams-开发者指南处理器api
我已经在如下的spring-boot应用程序中设置了KStreams(为了参考,更改了变量名(
@Bean
public Set<KafkaStreams> myKStreamJson(StreamsBuilder profileBuilder) {
Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
final KStream<String, JsonNode> pStream = myBuilder.stream(inputTopic, Consumed.with(Serdes.String(), jsonSerde));
Properties props = streamsConfig.kStreamsConfigs().asProperties();
pstream
.map((key, value) -> {
try {
return CustomParser.parse(key, value);
} catch (Exception e) {
LOGGER.error("Error occurred - " + e.getMessage());
}
return new KeyValue<>(null, null);
}
)
.filter((key, value) -> {
try {
return MessageFilter.filterNonNull(key, value);
} catch (Exception e) {
LOGGER.error("Error occurred - " + e.getMessage());
}
return false;
})
.through(
outputTopic,
Produced.with(Serdes.String(), new JsonPOJOSerde<>(TransformedMessage.class)));
return Sets.newHashSet(
new KafkaStreams(profileBuilder.build(), props)
);
}
实现Transformer
;保存init()
中的ProcessorContext
;然后,您可以访问transform()
中的记录元数据,并简单地返回原始键/值。
这是一个变压器的例子。Spring为Apache Kafka提供了调用Spring Integration流来转换键/值的功能。