我正在尝试处理一个对象流(通过http json请求)。
观察者返回这样的项目:
" 2015-05-06T13:24:20z",foo,foo,1,2,3,foo,foo,foo
第一个项目是时间戳,然后是foo对象存储在DB中为他们提高)。
我当前的实现如下:
public void updateFoos(final CallBack callBack) {
final String lastFooUpdateTimestamp = localStorage.getLastFooUpdateTimestamp();
fooService.getFoos(lastFooUpdateTimestamp)
.subscribe(new Subscriber<Object>() {
@Override
public void onCompleted() {
callBack.onSuccess();
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
if (o instanceof String) {
localStorage.setLastFooUpdateTimestamp((String) o);
}
if (o instanceof Foo) {
databaseManager.save((Foo) o);
}
}
});
}
有很多问题:
- 检查的实例不是很rxjavay,有更好的方法吗?
- 无论如何,时间戳始终是第一个字段,可以干净地表达?
- 我也想批量DB插入物,因此拥有一个单独的块来处理也可以批处理它们的foo对象。
- 是否有更好的设计,我会按类型发出多个可观察到的东西?但是,我该如何订阅多个观察者?
这是一个示例,如何使用rxjava:
public class MultikindSource {
enum ValueType {
TIMESTAMP,
NUMBER,
FOO
}
static final class Foo { }
static Observable<Object> source(String timestamp) {
return Observable.from(Arrays.asList(timestamp, new Foo(), new Foo(),
1, 2, 3, new Foo()));
}
public static void main(String[] args) {
Func1<Object, ValueType> keySelector = o -> {
if (o instanceof String) {
return ValueType.TIMESTAMP;
} else
if (o instanceof Foo) {
return ValueType.FOO;
}
return ValueType.NUMBER;
};
AtomicReference<String> lastTimestamp = new AtomicReference<>(
"2015-05-08T11:38:00.000Z");
source(lastTimestamp.get())
.groupBy(keySelector)
.flatMap(g -> {
if (g.getKey() == ValueType.TIMESTAMP) {
g.subscribe(v -> {
System.out.println("Updating timestamp to " + v);
lastTimestamp.set((String)v);
});
} else
if (g.getKey() == ValueType.FOO) {
g.buffer(2).subscribe(v ->
System.out.println("Batch inserting " + v));
} else {
g.subscribe(v ->
System.out.println("Got some number: " + v));
}
return Observable.just(1);
}).count().subscribe(v ->
System.out.println("Got " + v + " kinds of events."));
}
}
本质上,您可以通过一些枚举对源数据进行分组,然后将其链接到这些组并订阅以执行工作。