响应式编程——如何在RxJava中创建一个以固定间隔运行的作业



我正在尝试创建一个任务,将定期查询我的数据库写入其他状态的所有结果,我想使用RxJava做到这一点。

我使用RxJava-JDBC来查询我的数据库。下面是代码:

    final Database db = Database.from(url);
    db
        .select("SELECT f1,f2 FROM mydata")
        .autoMap(MyDatum.class)
        .subscribe(
            new Action1<MyDatum>() {
                @Override
                public void call(MyDatum t) {
                    state.add(t);
                }
            },
            new Action1<Throwable>() {
                @Override
                public void call(Throwable t) {
                    L.error("Task failed", t);
                }
            },
            new Action0() {
                @Override
                public void call() {
                    state.makeAvailable();
                }
            }
        );

问题是,当我订阅时,它会工作一次,然后停止。所以我使用Observable.interval,并有这个工作:

    Observable
        .interval(10, TimeUnit.SECONDS)
        .forEach(
            new Action1<Long>() {
                @Override
                public void call(Long arg) {
                    db
                        .select("SELECT f1,f2 FROM mydata")
                        .autoMap(MyDatum.class)
                        .subscribe(
                            new Action1<MyDatum>() {
                                @Override
                                public void call(MyDatum t) {
                                    state.add(t);
                                }
                            },
                            new Action1<Throwable>() {
                                @Override
                                public void call(Throwable t) {
                                    L.error("Task failed", t);
                                }
                            },
                            new Action0() {
                                @Override
                                public void call() {
                                    state.makeAvailable();
                                }
                            }
                        );
                }
            }
        );

但我想知道如果我没有做错了一个流嵌套在另一个。我想过使用flatMap,但是onComplete永远不会被执行,因为interval永远不会调用onComplete

我想让它进化,不仅由间隔触发,而且由传入事件触发。

我错过了什么吗?由于

doOnNextdoOnCompleted运算符在您的情况下非常有用。下面是一个示例,您可以使用这些操作符实现所描述的行为:

final Observable<MyDatum> observable = Observable.interval(10, TimeUnit.SECONDS).flatMap(new Func1<Long, Observable<MyDatum>>() {
    @Override
    public Observable<MyDatum> call(final Long counter) {
        return db.select("SELECT f1,f2 FROM mydata")
                .autoMap(MyDatum.class)
                .doOnNext(new Action1<MyDatum>() {
                    @Override
                    public void call(final MyDatum value) {
                        state.add(value);
                    }
                })
                .doOnCompleted(new Action0() {
                    @Override
                    public void call() {
                        state.makeAvailable();
                    }
                });
    }
});
final Subscription subscription = observable.subscribe();

flatMapmerge是您想使用的操作符。首先,你应该避免在操作符体中订阅可观察对象。而是使用flatMap并返回可观察对象。这将为你订阅所有发出的可观察对象。

为了手动触发查询,您可以合并在PublishSubject(文档)中,您可以调用onNext来推送事件并手动触发查询。将代码修改如下:

PublishSubject<Long> subject = PublishSubject.create();
Observable.merge(subject, Observable.timer(0, 1, TimeUnit.SECONDS))
    .flatMap(new Func1<Long, Observable<MyDatum>>() {
        @Override
        public Observable<MyDatum> call(Long arg) {
            return db
                .select("SELECT f1,f2 FROM mydata")
                .autoMap(MyDatum.class);
    }}).subscribe(
        new Action1<MyDatum>() {
            @Override
            public void call(MyDatum t) {
                state.add(t);
            }
        },
        new Action1<Throwable>() {
            @Override
            public void call(Throwable t) {
                L.error("Task failed", t);
            }
        },
        new Action0() {
            @Override
            public void call() {
                state.makeAvailable();
            }
        }
    );
// you can call onNext with any value to trigger a manual query
subject.onNext(999L);
下面是一个简单的RxJava代码片段,演示了这种行为。
CountDownLatch l = new CountDownLatch(5);
PublishSubject<Long> subject = PublishSubject.create();
Observable.merge(subject, Observable.timer(0, 1, TimeUnit.SECONDS).take(3))
        .flatMap((Long arg) -> {
            System.out.println("tick: " + arg);
            l.countDown();
            return Observable.just(arg+10);
        })
        .forEach(System.out::println);
l.await(1, TimeUnit.SECONDS);
subject.onNext(999L);
l.await();

tick: 0
10
tick: 999
1009
tick: 1
11
tick: 2
12

相关内容

  • 没有找到相关文章

最新更新