我正在尝试创建一个任务,将定期查询我的数据库写入其他状态的所有结果,我想使用RxJava做到这一点。
我使用RxJava-JDBC来查询我的数据库。下面是代码:
final Database db = Database.from(url);
db
.select("SELECT f1,f2 FROM mydata")
.autoMap(MyDatum.class)
.subscribe(
new Action1<MyDatum>() {
@Override
public void call(MyDatum t) {
state.add(t);
}
},
new Action1<Throwable>() {
@Override
public void call(Throwable t) {
L.error("Task failed", t);
}
},
new Action0() {
@Override
public void call() {
state.makeAvailable();
}
}
);
问题是,当我订阅时,它会工作一次,然后停止。所以我使用Observable.interval
,并有这个工作:
Observable
.interval(10, TimeUnit.SECONDS)
.forEach(
new Action1<Long>() {
@Override
public void call(Long arg) {
db
.select("SELECT f1,f2 FROM mydata")
.autoMap(MyDatum.class)
.subscribe(
new Action1<MyDatum>() {
@Override
public void call(MyDatum t) {
state.add(t);
}
},
new Action1<Throwable>() {
@Override
public void call(Throwable t) {
L.error("Task failed", t);
}
},
new Action0() {
@Override
public void call() {
state.makeAvailable();
}
}
);
}
}
);
但我想知道如果我没有做错了一个流嵌套在另一个。我想过使用flatMap
,但是onComplete
永远不会被执行,因为interval
永远不会调用onComplete
。
我想让它进化,不仅由间隔触发,而且由传入事件触发。
我错过了什么吗?由于
doOnNext
和doOnCompleted
运算符在您的情况下非常有用。下面是一个示例,您可以使用这些操作符实现所描述的行为:
final Observable<MyDatum> observable = Observable.interval(10, TimeUnit.SECONDS).flatMap(new Func1<Long, Observable<MyDatum>>() {
@Override
public Observable<MyDatum> call(final Long counter) {
return db.select("SELECT f1,f2 FROM mydata")
.autoMap(MyDatum.class)
.doOnNext(new Action1<MyDatum>() {
@Override
public void call(final MyDatum value) {
state.add(value);
}
})
.doOnCompleted(new Action0() {
@Override
public void call() {
state.makeAvailable();
}
});
}
});
final Subscription subscription = observable.subscribe();
flatMap
和merge
是您想使用的操作符。首先,你应该避免在操作符体中订阅可观察对象。而是使用flatMap并返回可观察对象。这将为你订阅所有发出的可观察对象。
为了手动触发查询,您可以合并在PublishSubject
(文档)中,您可以调用onNext
来推送事件并手动触发查询。将代码修改如下:
PublishSubject<Long> subject = PublishSubject.create();
Observable.merge(subject, Observable.timer(0, 1, TimeUnit.SECONDS))
.flatMap(new Func1<Long, Observable<MyDatum>>() {
@Override
public Observable<MyDatum> call(Long arg) {
return db
.select("SELECT f1,f2 FROM mydata")
.autoMap(MyDatum.class);
}}).subscribe(
new Action1<MyDatum>() {
@Override
public void call(MyDatum t) {
state.add(t);
}
},
new Action1<Throwable>() {
@Override
public void call(Throwable t) {
L.error("Task failed", t);
}
},
new Action0() {
@Override
public void call() {
state.makeAvailable();
}
}
);
// you can call onNext with any value to trigger a manual query
subject.onNext(999L);
下面是一个简单的RxJava代码片段,演示了这种行为。
CountDownLatch l = new CountDownLatch(5);
PublishSubject<Long> subject = PublishSubject.create();
Observable.merge(subject, Observable.timer(0, 1, TimeUnit.SECONDS).take(3))
.flatMap((Long arg) -> {
System.out.println("tick: " + arg);
l.countDown();
return Observable.just(arg+10);
})
.forEach(System.out::println);
l.await(1, TimeUnit.SECONDS);
subject.onNext(999L);
l.await();
tick: 0
10
tick: 999
1009
tick: 1
11
tick: 2
12