Reactive Kafka Receiver可以与非Reactive Elasticsearch客户端一起工作吗



下面是一个示例代码,它使用reactor kafka并从主题(带有重试逻辑(中读取数据,该主题具有通过非反应生产者发布的记录。在我的doOnNext()消费者中,我使用的是非反应式弹性搜索客户端,它对索引中的记录进行索引。因此,我有几个问题仍然不清楚:

  1. 我知道消费者和生产者是独立的解耦系统,但是否建议也有消费者是反应性的反应性生产者
  2. 如果我使用的是非反应性的东西,在本例中为Elasticsearch客户端org.elasticsearch.client.RestClient,则执行"反应性">的代码工作?如果有或没有,我该如何测试它?(所谓"反应性">,我指的是它的非阻塞IO部分,即如果我产生了三个反应性消费者,其中一个由于某种原因是潜在的,则线程应被解除阻塞并用于其他反应性消费者(
  3. 一般来说,问题是,如果我用响应式客户端包装一些API,API也应该是响应式的吗

public Disposable consumeRecords() {
long maxAttempts = 3, duration = 10;
RetryBackoffSpec retrySpec = Retry.backoff(maxAttempts, Duration.ofSeconds(duration)).transientErrors(true);
Consumer<ReceiverRecord<K, V>> doOnNextConsumer = x -> {
// use non-reactive elastic search client and index record x
};
return KafkaReceiver.create(receiverOptions)
.receive()
.doOnNext(record -> {
try {
// calling the non-reactive consumer
doOnNextConsumer.accept(record);
} catch (Exception e) {
throw new ReceiverRecordException(record, e);
}
record.receiverOffset().acknowledge();
})
.doOnError(t -> log.error("Error occurred: ", t))
.retryWhen(retrySpec)
.onErrorContinue((e, record) -> {
ReceiverRecordException receiverRecordException = (ReceiverRecordException) e;
log.error("Retries exhausted for: " + receiverRecordException);
receiverRecordException.getRecord().receiverOffset().acknowledge();
})
.repeat()
.subscribe();
}

对此有所了解。

Reactive KafkaReceiver会在内部调用一些API;如果API是阻塞API;反应性";非阻塞IO将不起作用,并且接收器线程将被阻塞,因为您正在调用阻塞API/非响应式API

您可以通过创建一个简单的服务器来测试这一点(它会在一段时间内阻止调用/睡眠(,并从这个接收器调用该服务器

最新更新