我有一个可观察的对象,它会发出字符串,我想按第一个字符对它们进行分组。用groupBy
这样做很容易:
Observable<String> rows = Observable.just("aa", "ab", "ac", "bb", "bc", "cc");
Observable<List<String>> groupedRows = rows.groupBy(new Func1<String, Character>() {
public Character call(String row) {
return row.charAt(0);
}
}).flatMap(new Func1<GroupedObservable<Character, String>, Observable<List<String>>>() {
public Observable<List<String>> call(GroupedObservable<Character, String> group) {
return group.toList();
}
});
groupedRows.toBlocking().forEach(new Action1<List<String>>() {
public void call(List<String> group) {
System.out.println(group);
}
});
// Output:
// [aa, ab, ac]
// [bb, bc]
// [cc]
但这对我的目的来说并不好,因为groupBy
只在可观测源发射onComplete
时完成每个组。因此,如果我有很多行,它们将完全收集在内存中,并且只在最后一行"刷新"并写入输出。
我需要类似buffer
算子的东西,但有我自己的函数,它表示每个群的边界。我是这样实现的(知道行总是按字母顺序排列):
Observable<String> rows = Observable.just("aa", "ab", "ac", "bb", "bc", "cc");
ConnectableObservable<String> connectableRows = rows.publish();
Observable<String> boundarySelector = connectableRows.filter(new Func1<String, Boolean>() {
private char lastChar = 0;
public Boolean call(String row) {
char currentChar = row.charAt(0);
boolean isNewGroup = lastChar != 0 && (currentChar != lastChar);
lastChar = currentChar;
return isNewGroup;
}
});
Observable<List<String>> groupedRows = connectableRows.buffer(boundarySelector);
connectableRows.connect();
groupedRows.toBlocking().forEach(new Action1<List<String>>() {
public void call(List<String> group) {
System.out.println(group);
}
});
// Output:
// []
// []
// []
它不起作用,因为boundarySelector
正在"吃掉"行,我觉得这很奇怪,因为我专门用ConnectableObservable
来表示在rows
开始发射之前我需要两个订阅者(boundarySelector
和groupedRows
)。
奇怪的是,如果我将rows
延迟1秒,那么这段代码就可以工作了。
所以问题是:如何使用自己的边界函数对任意数量的行进行分组?
Observable<Integer> source = Observable.range(0, 100);
source
.groupBy(k -> k / 10)
.publish(groups -> groups
.map(g -> Pair.of(g.getKey(), g.takeUntil(groups)))
.flatMap(kv ->
kv.second
.doOnNext(v -> System.out.println(kv.first + " value " + v))
.doOnCompleted(() -> System.out.println(kv.first + " done"))
))
.subscribe()
;
找到了一种使用buffer
:的方法
Observable<String> rows = Observable.just("aa", "ab", "ac", "bb", "bc", "cc");
ConnectableObservable<String> connectableRows = rows.publish();
Observable<String> boundarySelector = connectableRows.filter(new Func1<String, Boolean>() {
private char lastChar = 0;
public Boolean call(String row) {
char currentChar = row.charAt(0);
boolean isNewGroup = lastChar != 0 && (currentChar != lastChar);
lastChar = currentChar;
return isNewGroup;
}
});
Observable<List<String>> groupedRows = connectableRows
.refCount()
.buffer(boundarySelector);
groupedRows.toBlocking().forEach(new Action1<List<String>>() {
public void call(List<String> group) {
System.out.println(group);
}
});
// Output:
// [aa, ab, ac]
// [bb, bc]
// [cc]