是否可以使用Kafka Streams访问消息头



随着Kafka 0.11中记录(ProducerRecord&ConsumerRecord)中添加了Header,在使用Kafka Streams处理主题时是否可以获得这些Header?当在KStream上调用类似map的方法时,它提供了记录的keyvalue的参数,但我看不到访问headers的方法。如果我们能在ConsumerRecord上只使用map,那就太好了。

前任。

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
.map((key, value) ->  ... ) // can I get access to headers in methods like map, filter, aggregate, etc?
... 

像这样的东西会起作用:

KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
.map((record) -> {
record.headers();
record.key();
record.value();
})
...

自2.0.0版本以来,可以访问记录头(有关详细信息,请参阅KIP-244)。

您可以通过处理器API(即通过transform()transformValues()process())访问记录元数据;上下文";对象(参见。https://docs.confluent.io/current/streams/developer-guide/processor-api.html#accessing-处理器上下文)。

更新

从2.7.0版本开始,对处理器API进行了改进(参见KIP-478),添加了一个新的类型安全api.Processor类,该类使用process(Record)而不是process(K, V)方法。在这种情况下,可以通过Record类访问头(和记录元数据)。

这个新功能在";DSL的PAPI方法(例如KStream#process()KStream#transform()和同级)。

+++++

在2.0之前,上下文只公开主题、分区、偏移量和时间戳,而不公开Streams在旧版本中读取时实际丢弃的头。

元数据在DSL级别上不可用。然而,通过KIP-159扩展DSL的工作也在进行中。

最新更新