如何从kstream应用程序中读取Kafka消息头



是否可以从kstream应用程序读取我在kafkaProducer应用程序中设置的Kafka消息头?《我的卡夫卡制作人》是这样的;我在我的信息中设置了标题

public class Producer {
private final org.slf4j.Logger log = LoggerFactory.getLogger(Producer.class);
@Value("${topic.name}")
private String TOPIC;
private final KafkaTemplate<Integer, testEvent> kafkaTemplate;
@Autowired
public Producer(KafkaTemplate<Integer, testEvent> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendTestEvent(DataDto data) throws Exception {
TestEvent=testEvent.newBuilder()
.setTestEventId(data.getTestEventId())
.setTest(data.getTest().toString())
.build();
Message<testEvent> message = MessageBuilder
.withPayload(event)
.setHeader(KafkaHeaders.TOPIC, TOPIC)
.setHeader(KafkaHeaders.MESSAGE_KEY, 999)
.setHeader(KafkaHeaders.PARTITION_ID, 0)
.setHeader("X-Custom-Header", "Sending Custom Header with Spring Kafka")
.build();
this.kafkaTemplate.send(message);
log.info(String.format("Produced user -> %s", event));
}

Kstream应用程序是

public class MessageReader {

@Bean
public KStream<String, testEvent> kstreamPromotionUppercase(StreamsBuilder builder) {

KStream<String, testEvent> sourceStream = builder.stream("test-topic");
sourceStream.print(Printed.<String, testEvent>toSysOut().withLabel("Original Stream"));

KStream<String, testEvent> uppercaseStream =sourceStream.mapValues(this::MessageReaderCode);

return sourceStream;
}

我如何在kstream中阅读我的标题,我把它放在我的kafkaproducer中。

以下是如何处理KStream:的简单示例

sourceStream.process(() -> new ReadKafkaHeaderProcessor());

然后在ReadKafkaHeaderProcessor的处理方法中,你可以做:

@Override
public void process(Object key, Object value) {
Header header = context.headers().lastHeader(KafkaHeaders.TOPIC);
}

最新更新