我正在使用rxjava 1.2.2.
从我的列表开始,我想填充一个缓冲区,然后过滤缓冲区的最大项目的方式,例如,每5秒只有最大项目过滤器必须发出。
Observable<Item> EventEmitter = Observable.from(itemsList);
Observable<List<Item>> tapBufferEmitter = tapEventEmitter.buffer(5, TimeUnit.SECONDS);
MathObservable.from(tapBufferEmitter).max(new Comparator<List<Item>>() {
@Override
public int compare(List<Item> o1, List<Item> o2) {
int m1 =o1.getVal();
int m2 = o1.getVal();
if (m1 == m2){
return 0;
} else if (m1 > m2){
return 1;
} else {
return -1;
}
}
}).subscribeOn(Schedulers.from(executor1))
.subscribe(s -> {
System.out.println("Called thread: " + Thread.currentThread().getId());
syso.("Max Item is:" + s.getId());
}, e -> System.out.println(e.getMessage()));
但是上面的代码片段当然不起作用。我不想比较list o1和o2,我只想比较同一个list中的元素
最大操作符是正确的选择吗?请注意,我不是在比较整数,而是在比较项。每个项目都是一个有固定字段的头。我想要这个字段的最大值。
如何从缓冲区中选择最大值?由于
我写了一个例子,说明如何使用mathoobservable。马克斯算子。请注意,我确实使用了window而不是buffer,因为buffer会返回一个List,而window会给我一个Observable,我可以用flatMap和mathoobservablen来重用它。然后mathoobservable会计算给定窗口(包含5个元素的observable)的最大值。
Gradle:
compile 'io.reactivex:rxjava:1.2.1'
compile 'io.reactivex:rxjava-math:1.0.0'
窗口:
@Test
public void windowMaxTest() throws Exception {
Observable<Integer> just = Observable.just(10, 9, 8, 4, 7, 5, 6, 8, 4, 3);
Observable<Integer> integerObservable1 = just.window(5)
.flatMap(integerObservable -> {
return MathObservable.max(integerObservable);
});
TestSubscriber<Integer> testSubscriber = new TestSubscriber<>();
integerObservable1.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent();
testSubscriber.assertValues(10, 8);
}
缓冲:@Test
public void bufferMaxTest() throws Exception {
Observable<Integer> just = Observable.just(10, 9, 8, 4, 7, 5, 6, 8, 4, 3);
Observable<Integer> integerObservable1 = just.buffer(5)
.flatMap(integerObservable -> {
return MathObservable.max(Observable.from(integerObservable));
});
TestSubscriber<Integer> testSubscriber = new TestSubscriber<>();
integerObservable1.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent();
testSubscriber.assertValues(10, 8);
}
自定义对象::
class Item {
public int value;
public Item(int value) {
this.value = value;
}
}
@Test
public void test3214() throws Exception {
final Item max1 = new Item(3);
final Item max2 = new Item(6);
final List<Item> myListOfItem = Arrays.asList(new Item(1), new Item(2), max1, new Item(4), new Item(5), max2);
Observable<Item> itemObservable1 = Observable
.from(myListOfItem)
.buffer(3)
.flatMap(itemObservable -> {
Observable<Item> from = Observable.from(itemObservable);
return MathObservable.from(from)
.max((item, t1) -> {
return Integer.compare(item.value, t1.value);
});
});
TestSubscriber<Item> testSubscriber = new TestSubscriber<>();
itemObservable1.subscribe(testSubscriber);
testSubscriber.awaitTerminalEvent();
testSubscriber.assertValues(max1, max2);
}
一般来说,.reduce()是获取min/max/avg/sum等的标准选择。
reduce(0, /*return Math.max(lhs, rhs)*/)
所以整个操作就像这样-
source.window(/*...*/).flatMap(/*return .reduce(0, /*return Math.max(lhs, rhs)*/))