给定:我在kafka中有两个主题,例如主题A和主题B。Kafka流从主题A中读取记录,对其进行处理并产生多个记录(例如,RecordA和RecordB(对应于消耗记录。现在,问题是如何使用Kafka流来实现此目标。
KStream<String, List<Message>> producerStreams[] = recordStream.mapValues(new ValueMapper<Message, List<Message>>() {
@Override
public List<Message> apply(final Message message) {
return consumerRecordHandler.process(message);
}
}).*someFunction*()
在这里,记录读是消息;处理后,它返回消息列表。如何将此列表分为两个生产者流?任何帮助将不胜感激。
我不确定我是否正确理解了这个问题,也不了解@abhishek的答案:(
如果您有输入流,并且要获得每个输入记录的零,一个或更多输出记录,则将应用flatMap()
或flatMapValues()
(取决于要修改密钥(。
您还询问"如何将此列表分为两个生产者流?"如果您的意思是将一个流分为多个,可以使用branch()
。
有关更多详细信息,我指的是文档:https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#stateless-transformations
您的密钥是什么?我猜它不是String
。执行mapValues
后,您将拥有 - KStream<K,List<Message>>
。如果K
不是String
,则someFunction()
可以是map
,它将K
转换为String
(如果是,您已经有结果(,并将List<Message>
(值(放在未触及的情况下,因为那是您的预期最终结果