我的场景是我使用了很多共享前缀的Kafka主题(例如house.door,house.room) 并使用 Kafka 流正则表达式主题模式 API 使用所有主题。 一切看起来都不错,我得到了数据的密钥和消息。
为了处理数据,我需要主题名称,以便我可以根据主题名称进行连接, 但我不知道如何在卡夫卡流 DSL 中获取主题名称。
解决我的问题的一种可能方法是将主题名称与我的消息一起保存。 但是如果我可以直接获得主题名称会更好。
那么,如何在卡夫卡流中获取当前的卡夫卡主题?
为了补充Matthias J. Sax的观点,我附上了示例代码来展示如何做到这一点。
public static void main(final String[] args) {
try {
final Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streamProcessor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.STATE_DIR_CONFIG, "state-store");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
StreamsBuilder streamsBuilder = new StreamsBuilder();
final KStream<String, String> textLines = streamsBuilder.stream(inputTopicList);
final KStream<String, String> textLines = builder.stream(inputTopiclist);
textLines.transform(getTopicDetailsTransformer::new)
.foreach(new ForeachAction<String, String>() {
public void apply(String key, String value) {
System.out.println(key + ": " + value);
}
});
textLines.to(outputTopic);
} catch (Exception e) {
System.out.println(e);
}
}
private static class getTopicDetailsTransformer implements Transformer<String, String, KeyValue<String, String>> {
private ProcessorContext context;
@Override
public void init(final ProcessorContext context) {
this.context = context;
}
public KeyValue<String, String> transform(final String recordKey, final String recordValue) {
//here i am returning key as topic name.
return KeyValue.pair(context.topic(), recordValue);
}
@Override
public void close() {
// Not needed.
}
}
常见问题解答:https://docs.confluent.io/current/streams/faq.html#accessing-record-metadata-such-as-topic-partition-and-offset-information
记录元数据可通过处理器 API 访问。由于其处理器API集成,它也可以通过DSL间接访问。
使用 Processor API,您可以通过 ProcessorContext 访问记录元数据。您可以在 Processor#init() 期间在处理器的实例字段中存储对上下文的引用,然后在 Processor#process() 中查询处理器上下文,例如(与 Transformer 相同)。上下文会自动更新以匹配当前正在处理的记录,这意味着 ProcessorContext#partition() 等方法始终返回当前记录的元数据。在调度的 punctuate() 函数中调用处理器上下文时,需要注意一些注意事项,有关详细信息,请参阅 Javadocs。
例如,如果将 DSL 与自定义转换器结合使用,则可以将输入记录的值转换为还包括分区和偏移量元数据,然后后续 DSL 操作(如映射或筛选器)可以利用此信息。
常见问题解答:https://docs.confluent.io/current/streams/faq.html#accessing-record->元数据,例如主题分区和偏移量信息
我还可以确认处理器 API 允许您访问 kafka 记录的元数据。值得强调的是,记录的元数据应从org.apache.kafka.streams.processor
下的ProcessorContext
中提取,而不是从org.apache.kafka.streams.processor.api
包中提取,否则元数据不正确。更具体地说,我注意到在使用后一个包中的处理器时,记录的offset
总是0
。