使用单个json,但发布多个AVRO消息



我是Spring kafka Stream的新手。我最近建立了一个项目,并尝试了kafka流API来发布AVRO。要求:需要解析一个复杂的Jsonobject,它可能有多个子json。解析json并为每个子json发送AVRO消息。它也可以是一个或多个。

我知道如何使用kafka客户端API来做到这一点。它很简单,可以用Loop来完成,但使用Stream API,我是新手。

流类的当前代码如下所示:

@Service
@Log4j2
@EnableBinding(StreamBindings.class)
public class StreamListener {
@Autowired
Service service; 

@StreamListener("Processor-input-channel")
@SendTo("Processor-output-channel")
public KStream<String, AVROClass> process(KStream<String, String> input){
//parse input , map to fault and change received key to time stamp and send 
KStream<String, AVROClass> kStream = input
.mapValues(v -> service.getAVROResponse(v))
.map((k,v)->KeyValue.pair((Long.toString((System.currentTimeMillis()))), v));
kStream.foreach((k, v) -> log.info(String.format("Key: %s, Value: %s", k, v)));


return kStream;

}

我一次生成一个AVRO,但我需要知道如何生成多个AVRO并将其作为kafka中的单独消息发送到输出主题。由于我从getAVROResponse返回的内容将自动通过流发送出去。

要使用JSON,需要使用String/JON-Serde,而不是AvroClass(假设这实际上是AvroSpecificRecord子类)。

否则,您似乎缺少Avro SerdeConsumed.with(<<avroSerde>>)),或者在StreamsConfig 中将其设置为默认Serde

如果您想将传入的消息分隔为几个不同的主题,请根据某些条件(嵌套类型)使用branch

如果要将一条消息转换为多条消息,然后转换为某个输出主题,请使用flatMapValues().to()

要生产Avro,请确保在.to()输出中使用Produced.with()

您应该能够使用Kafka Streams和Spring Cloud Stream完成此用例,如下所示。请注意,不再建议使用StreamListener,并建议使用功能方法。请参见下文。

这只是一个psuedo代码,请根据需要进行更新。

@SpringBootApplication
public class MyApplication {
@Bean
public Function<KStream<String, String>, KStream<String, AvroClass>> process() {
return input -> input.mapValues(v -> service.getAVROResponse(v))
.map((k,v)->KeyValue.pair((Long.toString((System.currentTimeMillis()))), v));
}
@Bean
public Serde<AvroClass> avroInSerde(){
final SpecificAvroSerde<AvroClass> avroSerde = new SpecificAvroSerde<>();
avroSerde.configure(...)
return avroInSerde;
}
}

在配置中,您需要此功能,具体取决于您的设置。请适当更新架构注册表URL。

spring:
cloud:
stream:
kafka:
streams:
binder:
configuration:
schema.registry.url: http://localhost:8081
specific.avro.reader: true

如果您不想如上所述将Avro serde指定为Spring bean,则可以按如下所述进行配置。

spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

然而,我认为将avro serde指定为Spring bean可能是一种更干净的方法,因为Spring Cloud Stream绑定器可以根据serde bean的类型为函数签名进行类型匹配。例如,在这种情况下,绑定器将内省该函数,并将其输出签名KStream<String, AvroClass>与Serde bean上定义的内容Serde<AvroClass>相匹配,并将该签名指定为出站上的值Serde。

相关内容

  • 没有找到相关文章

最新更新