RxJava 将发布主题与超时相结合



我希望能够订阅发布主题并等待结果,但不超过 1 分钟。

问题是,如果我这样做

publishsubject.timeout(1, TimeUnit.MINUTES).subscribe({result -> ... }, {error -> ... } )

即使在此之前我成功获得结果,我也总是收到错误。如何正确实施这种方法?

您很可能会收到超时异常,因为timeout要求源在指定的时间范围内继续生成项目或完成。因此,如果您只向PublishSubject发出一个onNext信号,而不是更多,您将因缺少第二个onNext调用而超时。

因此,如果您想要一个项目,请使用take(在timeout之前或之后(:

publishsubject
.timeout(1, TimeUnit.MINUTES)
.take(1)
.subscribe(result -> { /* ... */ }, error -> { /* ... */ } )

在下面的示例中,我展示了timeout的工作原理。对于每个发射,将启动一个新的超时,如果新项目在超时运行之前到达,则会重述超时,否则将引发异常。

在示例中,我们可以看到 1、2、3 在控制台打印,并且它以超时异常结束,因为第 4 个项目在 3 之后的 200 毫秒内不在这里。

正如我在下面的评论中所说,如果您知道何时可以终止publishSubject,则可以避免这种情况。例如,在第 3 项之后使用taketakeUntil或调用publishSubject.onComplete()

@Test
public void timeout() throws InterruptedException {
PublishSubject<Object> publishSubject = PublishSubject.create();
Observable<Object> timeout = publishSubject.timeout(200, TimeUnit.MILLISECONDS);
timeout
.subscribe(
e -> System.out.println(e),
error -> System.out.println("ERROR: " + error),
() -> System.out.println("complete")
);
sleep(50);
publishSubject.onNext(1);
sleep(150);
publishSubject.onNext(2);
sleep(199);
publishSubject.onNext(3);
sleep(201);
publishSubject.onNext(4);
Thread.sleep(2000);
}

相关内容

  • 没有找到相关文章

最新更新