我们有一个用例,我们必须将一些消息读取到KStream中,然后转换消息并有条件地将其生成到另一个主题。
在我们的用例中,为了转换对象,我们进行一个下游的API调用。如果API调用成功,则生成到newTopic1,否则生成到newTopic2。如何实现同样的目标??
到目前为止,我们正在使用Streams API提供的到方法,为新的Kafka主题生成丰富的(即转换的对象(。
KStream<String, Customer> transformedStream = sourceKafkaStream
.mapValues(cust -> {
try {
logger.info("Hitting to the downstream to fetch additional information, will take few seconds.");
Thread.sleep(7000);
return RecordEnrichmentAndTransformationBuilder.enrichTheCustomer(cust);
} catch (InterruptedException e) {
e.printStackTrace();
}
return cust;
});
.to('newTopic1', Produced.with(AppSerdes.String(), AppSerdes.serdeForEnrichedCustomer()));
感谢对此的回应。
使用DSL api可以使用KStream::filter
或KStream:to(TopicNameExtractor<K, V> topicExtractor, Produced<K, V> produced)
。
如果两种格式相同,示例代码将是这样的:
KStream<String, Customer> transformedStream = sourceKafkaStream
.mapValues(cust -> {
try {
logger.info("Hitting to the downstream to fetch additional information, will take few seconds.");
Thread.sleep(7000);
return RecordEnrichmentAndTransformationBuilder.enrichTheCustomer(cust);
} catch (InterruptedException e) {
e.printStackTrace();
}
return cust;
});
.to((key, value, recordContext) -> topicNameCalculation(key, value), Produced.with(AppSerdes.String(), AppSerdes.serdeForEnrichedCustomer()));
topicNameCalculation(...)
会根据关键字和值选择合适的主题。
One Notice通常在Kafka Streams中进行外部调用不是一种好方法。