Header在Kafka处理器API中有什么用途



我正在学习Kafka处理器API,并在ProcessorContext中找到一个方法头。

headers​()

返回当前输入记录的标题;可能是如果不可用,则为空

这个方法有什么用?

在文档中只写一行:

返回当前输入记录的标题;如果不可用

我可以对此执行一些操作吗?

标头是可以附加到每条消息的某种元数据。标头可以用于各种场景,如添加过滤记录时可以使用的信息等。


您可以通过处理器API访问消息的元数据,更确切地说是process()transform()transformValues()。例如,为了在记录中添加一个标题,以下操作将起作用:

public void process(String key, String value) {
// add a header to the elements
context().headers().add.("key", "value")
}

标记为已接受的答案并不完全完整。

事实上,标头已添加到记录中,但记录不会发送回主题。

为此,我们需要使用正向方法:

public void process(Record<RecordKey, RecordValue> record) {
// add a header to the elements
record.headers().add("key", "value".getBytes());
// forward the record
context.forward(record);
}

相关内容

  • 没有找到相关文章

最新更新