用反应堆限制请求速率



我正在使用项目反应器使用REST加载Web服务的数据。这是与多个线程并行完成的。我开始在Web服务上达到速率限制,因此我想以每秒10条的要求发送最多10个请求,以避免遇到这些错误。我该如何使用反应堆?

使用zipwith(mono.delaymillis(100((?还是有更好的方法?

谢谢

您可以使用delayElements而不是整个zipwith

一个人可以使用Flux.delayElements在每1s时处理10个请求批次;请注意,如果处理时间超过1秒,则下一个批次仍将与以前的处理(以及可能其他许多以前的许多(一起处理!

! !

这就是为什么我提出了另一种解决方案,其中10个请求批次仍在每1s处理,但是,如果其处理时间超过1秒,则下一个批次将失败(请参阅overflow IllegalStateException(;人们可以处理这种失败,以便继续进行整体处理,但我不会在这里证明这一点,因为我想让示例简单。请参阅onErrorResume处理overflow IllegalStateException

以下代码将以每秒10个请求的速率在https://www.google.com/上进行GET。您必须进行其他更改,以支持服务器无法在1s中处理所有10个请求的情况;当您的服务器仍在处理上一秒钟的内容时,您可以跳过发送请求。

@Test
void parallelHttpRequests() {
    // this is just for limiting the test running period otherwise you don't need it
    int COUNT = 2;
    // use whatever (blocking) http client you desire;
    // when using e.g. WebClient (Spring, non blocking client)
    // the example will slightly change for no longer use
    // subscribeOn(Schedulers.elastic())
    RestTemplate client = new RestTemplate();
    
    // exit, lock, condition are provided to allow one to run 
    // all this code in a @Test, otherwise they won't be needed
    var exit = new AtomicBoolean(false);
    var lock = new ReentrantLock();
    var condition = lock.newCondition();
    MessageFormat message = new MessageFormat("#batch: {0}, #req: {1}, resultLength: {2}");
    Flux.interval(Duration.ofSeconds(1L))
            .take(COUNT) // this is just for limiting the test running period otherwise you don't need it
            .doOnNext(batch -> debug("#batch", batch)) // just for debugging
            .flatMap(batch -> Flux.range(1, 10) // 10 requests per 1 second
                            .flatMap(i -> Mono.fromSupplier(() ->
                                    client.getForEntity("https://www.google.com/", String.class).getBody()) // your request goes here (1 of 10)
                                    .map(s -> message.format(new Object[]{batch, i, s.length()})) // here the request's result will be the output of message.format(...)
                                    .doOnSubscribe(s -> debug("doOnSubscribe: #batch = " + batch + ", i = " + i)) // just for debugging
                                    .subscribeOn(Schedulers.elastic()) // one I/O thread per request
                            )
            )
            // consider using onErrorResume to handle overflow IllegalStateException
            .subscribe(
                    s -> debug("received", s) // do something with the above request's result
                    e -> {
                        // pay special attention to overflow IllegalStateException
                        debug("error", e.getMessage());
                        signalAll(exit, condition, lock);
                    },
                    () -> {
                        debug("done");
                        signalAll(exit, condition, lock);
                    }
            );
    await(exit, condition, lock);
}
// you won't need the "await" and "signalAll" methods below which
// I created only to be easier for one to run this in a test class
private void await(AtomicBoolean exit, Condition condition, Lock lock) {
    lock.lock();
    while (!exit.get()) {
        try {
            condition.await();
        } catch (InterruptedException e) {
            // maybe spurious wakeup
            e.printStackTrace();
        }
    }
    lock.unlock();
    debug("exit");
}
private void signalAll(AtomicBoolean exit, Condition condition, Lock lock) {
    exit.set(true);
    try {
        lock.lock();
        condition.signalAll();
    } finally {
        lock.unlock();
    }
}

最新更新