Kafka流:一条记录到多个记录



给定:我在kafka中有两个主题,例如主题A和主题B。Kafka流从主题A中读取记录,对其进行处理并产生多个记录(例如,RecordA和RecordB(对应于消耗记录。现在,问题是如何使用Kafka流来实现此目标。

KStream<String, List<Message>> producerStreams[] = recordStream.mapValues(new ValueMapper<Message, List<Message>>() {
        @Override
        public List<Message> apply(final Message message) {
          return consumerRecordHandler.process(message);
        }
    }).*someFunction*()

在这里,记录读是消息;处理后,它返回消息列表。如何将此列表分为两个生产者流?任何帮助将不胜感激。

我不确定我是否正确理解了这个问题,也不了解@abhishek的答案:(

如果您有输入流,并且要获得每个输入记录的零,一个或更多输出记录,则将应用flatMap()flatMapValues()(取决于要修改密钥(。

您还询问"如何将此列表分为两个生产者流?"如果您的意思是将一个流分为多个,可以使用branch()

有关更多详细信息,我指的是文档:https://docs.confluent.io/platform/current/streams/developer-guide/dsl-api.html#stateless-transformations

您的密钥是什么?我猜它不是String。执行mapValues后,您将拥有 - KStream<K,List<Message>>。如果K不是String,则someFunction()可以是map,它将K转换为String(如果是,您已经有结果(,并将List<Message>(值(放在未触及的情况下,因为那是您的预期最终结果

相关内容

  • 没有找到相关文章

最新更新