如何限制活动的SpringWebClient调用的数量



我有一个要求,我使用Spring Batch从SQL DB中读取一堆行(数千行(,并在将其写入Kafka主题之前调用REST服务来丰富内容。

使用Spring Reactive webClient时,如何限制活动的非阻塞服务调用的数量?在使用SpringBatch读取数据后,我是否应该以某种方式在循环中引入Flux?

(我理解delayElements的用法,它有不同的用途,比如当一个Get Service Call带来大量数据,而你希望服务器放慢速度时——不过,在这里,我的用例有点不同,因为我有很多WebClient调用要进行,我想限制调用的数量,以避免内存不足问题,但仍然可以获得非阻塞调用的优势(.

非常有趣的问题。我思考了一下,想了几个关于如何做到这一点的想法。我会分享我的想法,希望这里有一些想法可能会帮助你进行调查。

不幸的是,我对SpringBatch并不熟悉。然而,这听起来像是一个利率限制的问题,或者说是典型的生产者-消费者问题。

因此,我们有一个生产者,它产生了如此多的信息,以至于我们的消费者无法跟上,中间的缓冲变得难以忍受。

我看到的问题是,正如您所描述的,您的SpringBatch过程不是作为流或管道工作的,而是您的反应式Web客户端

因此,如果我们能够将数据作为流读取,那么当记录开始进入管道时,这些记录将由反应式web客户端处理,并且使用背压,我们可以从生产者/数据库端控制流的流量。

生产者方

所以,我要改变的第一件事是如何从数据库中提取记录。我们需要控制一次从数据库中读取多少记录,方法是对数据检索进行分页,或者控制提取大小,然后通过背压控制其中有多少记录通过反应管道发送到下游。

因此,考虑以下(基本的(数据库数据检索,它封装在Flux中。

Flux<String> getData(DataSource ds)  {
return Flux.create(sink -> {
try {
Connection con = ds.getConnection();
con.setAutoCommit(false);
PreparedStatement stm = con.prepareStatement("SELECT order_number FROM orders WHERE order_date >= '2018-08-12'", ResultSet.TYPE_FORWARD_ONLY);
stm.setFetchSize(1000);
ResultSet rs = stm.executeQuery();
sink.onRequest(batchSize -> {
try {
for (int i = 0; i < batchSize; i++) {
if (!rs.next()) {
//no more data, close resources!
rs.close();
stm.close();
con.close();
sink.complete();
break;
}
sink.next(rs.getString(1));
}
} catch (SQLException e) {
//TODO: close resources here
sink.error(e);
}
});
}
catch (SQLException e) {
//TODO: close resources here
sink.error(e);
}
});
}

在上面的例子中:

  • 我通过设置获取大小来控制每批读取的记录数量为1000
  • 信宿将发送订户请求的记录数量(即batchSize(,然后等待它使用背压请求更多记录
  • 当结果集中没有更多记录时,我们完成接收并关闭资源
  • 如果在任何时候发生错误,我们会发回错误并关闭资源
  • 或者,我可以使用分页来读取数据,通过在每个请求周期重新发出一个查询来简化资源的处理
  • 如果订阅被取消或释放(sink.onCancelsink.onDispose(,您也可以考虑采取措施,因为关闭连接和其他资源在这里是至关重要的

消费者端

在消费者端,您注册了一个订阅服务器,该订阅服务器当时只以1000的速度请求消息,并且只有在处理完该批消息后才会请求更多消息。

getData(source).subscribe(new BaseSubscriber<String>() {
private int messages = 0;
@Override
protected void hookOnSubscribe(Subscription subscription) {
subscription.request(1000);
}
@Override
protected void hookOnNext(String value) {
//make http request
System.out.println(value);
messages++;
if(messages % 1000 == 0) {
//when we're done with a batch
//then we're ready to request for more
upstream().request(1000);
}
}
});

在上面的示例中,当订阅开始时,它会请求第一批1000条消息。在onNext中,我们处理第一批,使用Web客户端发出http请求。

一旦批次完成,我们就会向出版商请求另一批1000,以此类推

你有它!使用背压可以控制一次有多少打开的HTTP请求。

我的示例非常初级,需要一些额外的工作才能做好生产准备,但我相信这有望提供一些可以适应您的SpringBatch场景的想法。

相关内容

  • 没有找到相关文章

最新更新