检查是否有订阅者在RxJava中抛出异常



我正在尝试制作一个响应式RabbitMQ侦听器,它允许我们用多个订阅者处理每条消息。我们只希望在所有订阅者成功完成时才ack消息。

这是我当前的设置:

Observable
    .fromCallable(() -> {
        // Set up connection
        return consumer;
    })
    .flatMap(consumer -> Observable
        .fromCallable(consumer::nextDelivery)
        .doOnError(throwable -> {
            try {
                consumer.getChannel().getConnection().close();
            } catch (IOException ignored) { }
        })
        .repeat())
    .retryWhen(observable -> observable.delay(3, TimeUnit.SECONDS))
    .publish()
    .refCount();

这将建立一次连接,与所有订阅者共享所有消息,并在3秒后重新连接,如果这在任何地方失败,例如Rabbit不可用。

我还需要做的是acknack的消息。由于我们所有的消息处理程序都是幂等的,所以我可以在任何处理程序碰巧失败时重新请求消息,以确保每个处理程序都成功完成。

是否有办法告诉任何订阅失败?我正在考虑订阅这样的内容:

public void subscribe(Action1 action) {
    deliveries
        .flatMap(delivery -> Observable
            .just(delivery)
            .doOnNext(action)
            .doOnError(throwable -> {
                // nack
            })
            .doOnCompleted(() -> {
                // ack
            })
        )
        .subscribe();
}

但这显然是ack s或nack s对第一次失败或成功的影响。是否有办法merge特定消息的所有订阅者,然后检查错误或完成?

我还尝试过使用AtomicInteger来计算所有订阅者,然后计算成功/失败,但很明显,无论何时有人在处理期间订阅或退订,都没有简单的同步方法而不会阻塞整个处理步骤。

我也可以给每个订阅者一个Observable<Delivery>,并使他们返回一个错误或完成,类似于retryWhen(作为一种回复通道),但我没有办法生成所需数量的可观察对象,然后合并它们。

任何想法?感谢阅读!

您可以使用onErrorResumeNext来控制从管道传播的异常,并设置 ack,然后使用onComplete作为ack

这里有一个例子

Observable.just(null).map(Object::toString)
                     .doOnError(failure -> System.out.println("Error:" + failure.getCause()))
                     .retryWhen(errors -> errors.doOnNext(o -> count++)
                     .flatMap(t -> count > 3 ? Observable.error(t) : Observable.just(null).delay(100, TimeUnit.MILLISECONDS)),
                                                     Schedulers.newThread())
                     .onErrorResumeNext(t -> {
                                              System.out.println("Error after all retries:" + t.getCause());
                                              return Observable.just("I save the world!");
                                          })
                     .subscribe(s -> System.out.println(s));

您想要使用Observable.mergeDelayError,然后使用.onError*方法之一。

第一个只会在所有可观察对象完成/出错后传播错误;第二个选项允许您在处理完成后处理错误

编辑:要获得计数,请计算成功次数:

Object message = ...;
List<Action1<?>> actions = ...;
Observable.from(actions)
 .map(action ->
    Observable.defer(() -> Observable.just(action.call(message))
    .ignoreEmements()
    .cast(Integer.class)
    .switchIfEmpty(1)
    .onErrorReturnResumeNext(e->Observable.empty())
 )
 .compose(Observable::merge)
 .count();

这有点复杂,可以更清楚:拨打电话,忽略其中的错误,计算成功。

相关内容

  • 没有找到相关文章

最新更新