我正在学习Kafka处理器API,并在ProcessorContext
中找到一个方法头。
headers()
返回当前输入记录的标题;可能是如果不可用,则为空
这个方法有什么用?
在文档中只写一行:
返回当前输入记录的标题;如果不可用
我可以对此执行一些操作吗?
标头是可以附加到每条消息的某种元数据。标头可以用于各种场景,如添加过滤记录时可以使用的信息等。
您可以通过处理器API访问消息的元数据,更确切地说是process()
、transform()
和transformValues()
。例如,为了在记录中添加一个标题,以下操作将起作用:
public void process(String key, String value) {
// add a header to the elements
context().headers().add.("key", "value")
}
标记为已接受的答案并不完全完整。
事实上,标头已添加到记录中,但记录不会发送回主题。
为此,我们需要使用正向方法:
public void process(Record<RecordKey, RecordValue> record) {
// add a header to the elements
record.headers().add("key", "value".getBytes());
// forward the record
context.forward(record);
}