我有一个响应式kafka应用程序,它从一个主题读取数据并写入另一个主题。主题有多个分区,我希望创建与主题中的分区相同数量的消费者(在同一消费者组中)。从我对这个线程的理解来看。receive()将只创建KafkaReceiver的一个实例,该实例将从主题中的所有分区中读取。因此,我需要多个接收器并行地从不同的分区读取数据。
为此我编写了以下代码:
@Bean
public ReceiverOptions<String, String> kafkaReceiverOptions(String topic, KafkaProperties kafkaProperties) {
ReceiverOptions<String, String> basicReceiverOptions = ReceiverOptions.create(kafkaProperties.buildConsumerProperties());
return basicReceiverOptions.subscription(Collections.singletonList(topic))
.addAssignListener(receiverPartitions -> log.debug("onPartitionAssigned {}", receiverPartitions))
.addRevokeListener(receiverPartitions -> log.debug("onPartitionsRevoked {}", receiverPartitions));
}
@Bean
public ReactiveKafkaConsumerTemplate<String, String> kafkaConsumerTemplate(ReceiverOptions<String, String> kafkaReceiverOptions) {
return new ReactiveKafkaConsumerTemplate<String, String>(kafkaReceiverOptions);
}
@Bean
public ReactiveKafkaProducerTemplate<String, List<Object>> kafkaProducerTemplate(
KafkaProperties properties) {
Map<String, Object> props = properties.buildProducerProperties();
return new ReactiveKafkaProducerTemplate<String, List<Object>>(SenderOptions.create(props));
}
public void run(String... args) {
for(int i = 0; i < topicPartitionsCount ; i++) {
readWrite(destinationTopic).subscribe();
}
}}
public Flux<String> readWrite(String destTopic) {
return kafkaConsumerTemplate
.receiveAutoAck()
.doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
consumerRecord.key(),
consumerRecord.value(),
consumerRecord.topic(),
consumerRecord.offset())
)
.doOnNext(consumerRecord -> log.info("Record received from partition {} in thread {}", consumerRecord.partition(),Thread.currentThread().getName()))
.doOnNext(s-> sendToKafka(s,destTopic))
.map(ConsumerRecord::value)
.onErrorContinue((exception,errorConsumer)->{
log.error("Error while consuming : {}", exception.getMessage());
});
}
public void sendToKafka(ConsumerRecord<String, String> consumerRecord, String destTopic){
kafkaProducerTemplate.send(destTopic, consumerRecord.key(), transformRecord(consumerRecord))
.doOnNext(senderResult -> log.info("Record received from partition {} in thread {}", consumerRecord.partition(),Thread.currentThread().getName()))
.doOnSuccess(senderResult -> {
log.debug("Sent {} offset : {}", metrics, senderResult.recordMetadata().offset());
}
.doOnError(exception -> {
log.error("Error while sending message to destination topic : {}", exception.getMessage());
})
.subscribe();
}
当我对此进行测试时,它似乎工作正常,创建了多个Kafka Receiver实例并行处理分区。我的问题是,这是最有效的方式来创建多个实例吗?在响应式Kafka中是否有其他方法来做到这一点?
你做的是对的。