从Flux消耗时顺序调用非阻塞操作,包括重试



因此,我的用例是在使用Project Reactor以反应式编程时,在Spring Webflux应用程序中使用Kafka的消息,并按照从Kafka接收消息的顺序对每个消息执行非阻塞操作。该系统还应该能够自行恢复。

以下是设置为使用的代码片段:

Flux<ReceiverRecord<Integer, DataDocument>> messages = Flux.defer(() -> {
KafkaReceiver<Integer, DataDocument> receiver = KafkaReceiver.create(options);
return receiver.receive();
});
messages.map(this::transformToOutputFormat)
.map(this::performAction)
.flatMapSequential(receiverRecordMono -> receiverRecordMono)
.doOnNext(record -> record.receiverOffset().acknowledge())
.doOnError(error -> logger.error("Error receiving record", error))
.retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5))
.subscribe();

正如你所看到的,我所做的是:从Kafka中获取消息,将其转换为一个用于新目的地的对象,然后将其发送到目的地,然后确认偏移量,将消息标记为已消费和已处理。至关重要的是,要按照与从Kafka消费的消息相同的顺序确认偏移,这样我们就不会将偏移移动到未完全处理的消息之外(包括向目的地发送一些数据)。因此,我使用flatMapSequential来确保这一点。

为了简单起见,我们假设transformToOutputFormat()方法是一个恒等式变换。

public ReceiverRecord<Integer, DataDocument> transformToOutputFormat(ReceiverRecord<Integer, DataDocument> record) {
return record;
}

performAction()方法需要在网络上做一些事情,比如调用HTTPRESTneneneba API。因此,适当的API返回一个Mono,这意味着需要订阅链。此外,我需要通过此方法返回ReceiverRecord,以便在上面的flatMapSequence()运算符中确认偏移量。因为我需要订阅Mono,所以我使用上面的flatMapSequential。如果没有,我本可以用map来代替。

public Mono<ReceiverRecord<Integer, DataDocument>> performAction(ReceiverRecord<Integer, DataDocument> record) {
return Mono.just(record)
.flatMap(receiverRecord ->
HttpClient.create()
.port(3000)
.get()
.uri("/makeCall?data=" + receiverRecord.value().getData())
.responseContent()
.aggregate()
.asString()
)
.retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5))
.then(Mono.just(record));

在这种方法中,我有两个相互矛盾的需求:1.订阅进行HTTP调用的链2.返回接收器记录的

使用flatMap()意味着我的返回类型变为Mono。在同一位置使用doOnNext()将在链中保留ReceiverRecord,但不允许自动订阅HttpClient响应。

我不能在asString()之后添加.subscribe(),因为我想等到完全收到HTTP响应后再确认偏移量。

我也不能使用.block(),因为它在并行线程上运行。

因此,我需要欺骗并从方法范围返回record对象。

另一件事是,在performAction内部重试时,它会切换线程。由于flatMapSequential()在外部流量中急切地订阅每个Mono,这意味着虽然可以保证按顺序确认偏移,但我们不能保证performAction中的HTTP调用将按相同顺序执行。

所以我有两个问题。

  1. 是否可以以自然的方式返回record,而不是返回方法范围对象
  2. 是否可以确保HTTP调用和偏移确认都以与正在执行这些操作的消息相同的顺序执行

这是我提出的解决方案。

Flux<ReceiverRecord<Integer, DataDocument>> messages = Flux.defer(() -> {
KafkaReceiver<Integer, DataDocument> receiver = KafkaReceiver.create(options);
return receiver.receive();
});
messages.map(this::transformToOutputFormat)
.delayUntil(this::performAction)
.doOnNext(record -> record.receiverOffset().acknowledge())
.doOnError(error -> logger.error("Error receiving record", error))
.retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5))
.subscribe();

我所做的不是使用flatMapSequential来订阅performAction Mono并保留序列,而是将Kafka接收器对更多消息的请求延迟到执行该操作。这就实现了我所需要的一次一个的处理。

因此,performAction不需要返回ReceiverRecord的Mono。我还将其简化为以下内容:

public Mono<String> performAction(ReceiverRecord<Integer, DataDocument> record) {
HttpClient.create()
.port(3000)
.get()
.uri("/makeCall?data=" + receiverRecord.value().getData())
.responseContent()
.aggregate()
.asString()
.retryBackoff(100, Duration.ofSeconds(5), Duration.ofMinutes(5));
}

相关内容

最新更新