我是KStream的新手,想了解如何使用KStream优化处理n大小的一批记录的最佳实践或指导。我有一个如下所示的工作代码,但它一次只适用于单个消息。
KStream<String, String> sourceStream = builder.stream("upstream-kafka-topic",
Consumed.with(Serdes.String(),
Serders.String());
//transform sourceStream using implementation of ValueTransformer<String,String>
sourceStream.transformValues(() -> new MyValueTransformer()).
to("downstream-kafka-topic",
Produced.with(Serdes.String(),
Serdes.String());
上面的代码使用单个记录作为MyValueTransformer
,它实现了ValueTransformer转换单个String
值。如何使以上代码适用于String
值中的Collection
?
您需要以某种方式"缓冲液/聚集体";消息。例如,您可以向转换器添加一个状态存储,并在存储中存储N条消息。只要存储中包含的消息少于N条,就不会进行任何处理,也不会发出任何输出(您可能希望使用允许发出零结果的flatTransformValues
(。
不确定你想要实现什么。Kafka Streams的概念是一次处理一条记录。如果您想处理一个消息集合或一批消息,您有几个选项。
您可能实际上不需要Kafka流,因为您提到的例子与消息没有太大关系,在这种情况下,您可以利用一个普通的Consumer,它将使您能够批量处理。在这里检查spring-Kafka对此的实现->https://docs.spring.io/spring-kafka/docs/current/reference/html/#receiving-消息(Kafka在网络层处理批处理,但通常你会一次处理一条记录,但使用标准客户端处理批处理是可能的(或者你可以将Object值建模为一个消息数组,这样对于每个记录,你都会收到一个包含嵌入集合的对象,然后你可以使用Kafka流来完成它,检查Avro的数组类型->https://avro.apache.org/docs/current/spec.html#Arrays
查看文档的这一部分以更好地理解Kafka流的概念->https://kafka.apache.org/31/documentation/streams/core-concepts