随着Kafka 0.11中记录(ProducerRecord&ConsumerRecord)中添加了Header,在使用Kafka Streams处理主题时是否可以获得这些Header?当在KStream
上调用类似map
的方法时,它提供了记录的key
和value
的参数,但我看不到访问headers
的方法。如果我们能在ConsumerRecord
上只使用map
,那就太好了。
前任。
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
.map((key, value) -> ... ) // can I get access to headers in methods like map, filter, aggregate, etc?
...
像这样的东西会起作用:
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> stream = kStreamBuilder.stream("some-topic");
stream
.map((record) -> {
record.headers();
record.key();
record.value();
})
...
自2.0.0版本以来,可以访问记录头(有关详细信息,请参阅KIP-244)。
您可以通过处理器API(即通过transform()
、transformValues()
或process()
)访问记录元数据;上下文";对象(参见。https://docs.confluent.io/current/streams/developer-guide/processor-api.html#accessing-处理器上下文)。
更新
从2.7.0版本开始,对处理器API进行了改进(参见KIP-478),添加了一个新的类型安全api.Processor
类,该类使用process(Record)
而不是process(K, V)
方法。在这种情况下,可以通过Record
类访问头(和记录元数据)。
这个新功能在";DSL的PAPI方法(例如KStream#process()
、KStream#transform()
和同级)。
+++++
在2.0之前,上下文只公开主题、分区、偏移量和时间戳,而不公开Streams在旧版本中读取时实际丢弃的头。
元数据在DSL级别上不可用。然而,通过KIP-159扩展DSL的工作也在进行中。