使用rxjava处理变化的对象流



我正在尝试处理一个对象流(通过http json请求)。

观察者返回这样的项目:

" 2015-05-06T13:24:20z",foo,foo,1,2,3,foo,foo,foo

第一个项目是时间戳,然后是foo对象存储在DB中为他们提高)。

我当前的实现如下:

public void updateFoos(final CallBack callBack) {
    final String lastFooUpdateTimestamp = localStorage.getLastFooUpdateTimestamp();
    fooService.getFoos(lastFooUpdateTimestamp)
            .subscribe(new Subscriber<Object>() {
                @Override
                public void onCompleted() {
                    callBack.onSuccess();
                }
                @Override
                public void onError(Throwable e) {
                }
                @Override
                public void onNext(Object o) {
                    if (o instanceof String) {
                        localStorage.setLastFooUpdateTimestamp((String) o);
                    }
                    if (o instanceof Foo) {
                        databaseManager.save((Foo) o);
                    }
                }
            });
}

有很多问题:

  1. 检查的实例不是很rxjavay,有更好的方法吗?
  2. 无论如何,时间戳始终是第一个字段,可以干净地表达?
  3. 我也想批量DB插入物,因此拥有一个单独的块来处理也可以批处理它们的foo对象。
  4. 是否有更好的设计,我会按类型发出多个可观察到的东西?但是,我该如何订阅多个观察者?

这是一个示例,如何使用rxjava:

public class MultikindSource {
    enum ValueType {
        TIMESTAMP,
        NUMBER,
        FOO
    }
    static final class Foo { }
    static Observable<Object> source(String timestamp) {
        return Observable.from(Arrays.asList(timestamp, new Foo(), new Foo(),
            1, 2, 3, new Foo()));
    }
    public static void main(String[] args) {
        Func1<Object, ValueType> keySelector = o -> {
            if (o instanceof String) {
                return ValueType.TIMESTAMP;
            } else
            if (o instanceof Foo) {
                return ValueType.FOO;
            }
            return ValueType.NUMBER;
        };
        AtomicReference<String> lastTimestamp = new AtomicReference<>(
            "2015-05-08T11:38:00.000Z");
        source(lastTimestamp.get())
        .groupBy(keySelector)
        .flatMap(g -> {
            if (g.getKey() == ValueType.TIMESTAMP) {
                g.subscribe(v -> {
                    System.out.println("Updating timestamp to " + v);
                    lastTimestamp.set((String)v);
                });
            } else
            if (g.getKey() == ValueType.FOO) {
                g.buffer(2).subscribe(v -> 
                    System.out.println("Batch inserting " + v));
            } else {
                g.subscribe(v -> 
                    System.out.println("Got some number: " + v));
            }
            return Observable.just(1);
        }).count().subscribe(v -> 
            System.out.println("Got " + v + " kinds of events."));
    }
}

本质上,您可以通过一些枚举对源数据进行分组,然后将其链接到这些组并订阅以执行工作。

最新更新