我们有一个传入的kafka主题,多个基于avro架构的消息序列化。
我们需要根据常见架构属性的某些值将AVRO格式的消息分为多个其他KAFKA主题。
|------> [OUTGOING TOPIC(AVRO) - A]
[INCOMING TOPIC(AVRO)] ----->|------> [OUTGOING TOPIC(AVRO) - B]
|------> [OUTGOING TOPIC(AVRO) - C]
想了解如何实现它,同时避免建立中间客户以在Confluent平台上进行分裂/路由。
我探索了kafka连接器,但没有找到可做此工作的现有连接器。
您可以编写一个Kafka Streams应用程序并使用branch()
:
KStream input = builder.stream("topic");
KStream[] splitStreams = input.branch(...);
splitStream[0].to("output-topic-1");
splitStream[1].to("output-topic-2");
// etc.