我有一个反应器Kafka项目,它从Kafka主题中获取消息,转换消息,然后写入另一个主题。
public Flux<String> consume(String destTopic) {
return kafkaConsumerTemplate
.receiveAutoAck()
.doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
consumerRecord.key(),
consumerRecord.value(),
consumerRecord.topic(),
consumerRecord.offset())
)
.doOnNext(s-> sendToKafka(s,destTopic))
.map(ConsumerRecord::value)
.doOnError(throwable -> log.error("Error while consuming : {}", throwable.getMessage()));
}
我的理解是只有在反应器中成功完成所有顺序步骤后才提交偏移量。对吗?我想确保下一条记录不被处理,除非当前记录成功地发送到目标Kafka Topic。
实现如下:
@Override
public Flux<Flux<ConsumerRecord<K, V>>> receiveAutoAck(Integer prefetch) {
return withHandler(AckMode.AUTO_ACK, (scheduler, handler) -> handler
.receive()
.filter(it -> !it.isEmpty())
.publishOn(scheduler, preparePublishOnQueueSize(prefetch))
.map(consumerRecords -> Flux.fromIterable(consumerRecords)
.doAfterTerminate(() -> {
for (ConsumerRecord<K, V> r : consumerRecords) {
handler.acknowledge(r);
}
})));
}
因此,每个ConsumerRecords
只有在其Flux
被完全处理(成功或错误)时才返回。因此,它不是按记录提交。从技术上讲,它不能是每条记录,因为我们只需要提交我们的消费者应用程序失败,我们需要从我们之前留下的偏移量继续。当前活动的KafkaConsumer
保持一个游标在内存中,不关心你是否提交。
如果你真的想要"每条记录"参见ReactiveKafkaConsumerTemplate.receive()
及其KafkaReceiver.receive()
委托:
/**
* Starts a Kafka consumer that consumes records from the subscriptions or partition
* assignments configured for this receiver. Records are consumed from Kafka and delivered
* on the returned Flux when requests are made on the Flux. The Kafka consumer is closed
* when the returned Flux terminates.
* <p>
* Every record must be acknowledged using {@link ReceiverOffset#acknowledge()} in order
* to commit the offset corresponding to the record. Acknowledged records are committed
* based on the configured commit interval and commit batch size in {@link ReceiverOptions}.
* Records may also be committed manually using {@link ReceiverOffset#commit()}.
*
* @return Flux of inbound receiver records that are committed only after acknowledgement
*/
default Flux<ReceiverRecord<K, V>> receive() {
如果你想控制提交行为,你需要像这样禁用自动提交:
ReceiverOptions.create()
.commitInterval(Duration.ZERO)
.commitBatchSize(0)
然后你需要在记录被处理后提交:
final ReceiverOptions<String, String> receiverOptions = ReceiverOptions.<String, String>create()
.commitInterval(Duration.ZERO)
.commitBatchSize(0)
.subscription(List.of("mytopic"));
sender.send(KafkaReceiver.create(receiverOptions)
.receive()
.map(m -> SenderRecord.create(transform(m.key(), m.value()), m.receiverOffset()))) // transform the data
.doOnNext(m -> m.correlationMetadata().commit().block()); // Synchronous commit after record is successfully delivered