我正在尝试制作一个响应式RabbitMQ侦听器,它允许我们用多个订阅者处理每条消息。我们只希望在所有订阅者成功完成时才ack
消息。
这是我当前的设置:
Observable
.fromCallable(() -> {
// Set up connection
return consumer;
})
.flatMap(consumer -> Observable
.fromCallable(consumer::nextDelivery)
.doOnError(throwable -> {
try {
consumer.getChannel().getConnection().close();
} catch (IOException ignored) { }
})
.repeat())
.retryWhen(observable -> observable.delay(3, TimeUnit.SECONDS))
.publish()
.refCount();
这将建立一次连接,与所有订阅者共享所有消息,并在3秒后重新连接,如果这在任何地方失败,例如Rabbit不可用。
我还需要做的是ack
或nack
的消息。由于我们所有的消息处理程序都是幂等的,所以我可以在任何处理程序碰巧失败时重新请求消息,以确保每个处理程序都成功完成。
是否有办法告诉任何订阅失败?我正在考虑订阅这样的内容:
public void subscribe(Action1 action) {
deliveries
.flatMap(delivery -> Observable
.just(delivery)
.doOnNext(action)
.doOnError(throwable -> {
// nack
})
.doOnCompleted(() -> {
// ack
})
)
.subscribe();
}
但这显然是ack
s或nack
s对第一次失败或成功的影响。是否有办法merge
特定消息的所有订阅者,然后检查错误或完成?
我还尝试过使用AtomicInteger
来计算所有订阅者,然后计算成功/失败,但很明显,无论何时有人在处理期间订阅或退订,都没有简单的同步方法而不会阻塞整个处理步骤。
我也可以给每个订阅者一个Observable<Delivery>
,并使他们返回一个错误或完成,类似于retryWhen
(作为一种回复通道),但我没有办法生成所需数量的可观察对象,然后合并它们。
任何想法?感谢阅读!
您可以使用onErrorResumeNext来控制从管道传播的异常,并设置 ack,然后使用onComplete作为ack
这里有一个例子
Observable.just(null).map(Object::toString)
.doOnError(failure -> System.out.println("Error:" + failure.getCause()))
.retryWhen(errors -> errors.doOnNext(o -> count++)
.flatMap(t -> count > 3 ? Observable.error(t) : Observable.just(null).delay(100, TimeUnit.MILLISECONDS)),
Schedulers.newThread())
.onErrorResumeNext(t -> {
System.out.println("Error after all retries:" + t.getCause());
return Observable.just("I save the world!");
})
.subscribe(s -> System.out.println(s));
您想要使用Observable.mergeDelayError
,然后使用.onError*
方法之一。
第一个只会在所有可观察对象完成/出错后传播错误;第二个选项允许您在处理完成后处理错误。
编辑:要获得计数,请计算成功次数:
Object message = ...;
List<Action1<?>> actions = ...;
Observable.from(actions)
.map(action ->
Observable.defer(() -> Observable.just(action.call(message))
.ignoreEmements()
.cast(Integer.class)
.switchIfEmpty(1)
.onErrorReturnResumeNext(e->Observable.empty())
)
.compose(Observable::merge)
.count();
这有点复杂,可以更清楚:拨打电话,忽略其中的错误,计算成功。