Kafka Streams在转换后有条件地生成消息



我们有一个用例,我们必须将一些消息读取到KStream中,然后转换消息并有条件地将其生成到另一个主题。

在我们的用例中,为了转换对象,我们进行一个下游的API调用。如果API调用成功,则生成到newTopic1,否则生成到newTopic2。如何实现同样的目标??

到目前为止,我们正在使用Streams API提供的方法,为新的Kafka主题生成丰富的(即转换的对象(。

KStream<String, Customer> transformedStream = sourceKafkaStream
.mapValues(cust -> {
try {
logger.info("Hitting to the downstream to fetch additional information, will take few seconds.");
Thread.sleep(7000);
return RecordEnrichmentAndTransformationBuilder.enrichTheCustomer(cust);
} catch (InterruptedException e) {
e.printStackTrace();
}
return cust;
});
.to('newTopic1', Produced.with(AppSerdes.String(), AppSerdes.serdeForEnrichedCustomer()));

感谢对此的回应。

使用DSL api可以使用KStream::filterKStream:to(TopicNameExtractor<K, V> topicExtractor, Produced<K, V> produced)

如果两种格式相同,示例代码将是这样的:

KStream<String, Customer> transformedStream = sourceKafkaStream
.mapValues(cust -> {
try {
logger.info("Hitting to the downstream to fetch additional information, will take few seconds.");
Thread.sleep(7000);
return RecordEnrichmentAndTransformationBuilder.enrichTheCustomer(cust);
} catch (InterruptedException e) {
e.printStackTrace();
}
return cust;
});
.to((key, value, recordContext) -> topicNameCalculation(key, value), Produced.with(AppSerdes.String(), AppSerdes.serdeForEnrichedCustomer()));

topicNameCalculation(...)会根据关键字和值选择合适的主题。

One Notice通常在Kafka Streams中进行外部调用不是一种好方法。

相关内容

  • 没有找到相关文章

最新更新