我正在尝试创建自己的RxJava操作符,称为groupByUntilChanged()
。它的功能类似于groupBy()
,但假设排放是基于密钥的顺序。因此,当键值改变时,它完成了GroupedObservable
,并为下一个键移动到下一个GroupedObservable
。
这是我到目前为止的工作。我使用每个String
的第一个字母作为密钥。这似乎工作得很好,直到我扔一个"A"String
在结束。
public class Test {
public static void main(String[] args) {
Observable<String> items =
Observable.just("Alpha","Adam","Apple","Beta","Brick","Bridge","Bat","Gamma","Gorilla","Axe");
Func1<String,String> keyExtractor = s -> s.substring(0,1);
items.compose(orderedGroupBy(keyExtractor))
.flatMap(grp -> grp.toList())
.subscribe(System.out::println);
}
public static <T,K> Observable.Transformer<T,GroupedObservable<K,T>> orderedGroupBy(Func1<T,K> keySelector) {
return obs -> obs.groupBy(keySelector)
.map(grp ->
GroupedObservable.from(grp.getKey(),grp.takeWhile(t -> keySelector.call(t).equals(grp.getKey())))
);
}
}
我得到这样的输出:
[Alpha, Adam, Apple, Axe]
[Beta, Brick, Bridge, Bat]
[Gamma, Gorilla]
当我真的想要这个:
[Alpha, Adam, Apple]
[Beta, Brick, Bridge, Bat]
[Gamma, Gorilla]
[Axe]
我该怎么做才能使一组有序的排放在键变化时将onComplete()
转换为GroupedObservable
?
跨groupBy
操作符协调GroupedObservable
的完成是一件相当棘手的事情(尽管在您的情况下,同步处理可能会启用其他解决方案)。出于这个原因,groupBy
有一个重载,允许您指定mapFactory
。如果你使用Guava CacheBuilder
作为groupBy
重载的javadoc,那么你可以为地图指定一个最大大小为1和你想要的行为结果:
Func1<String,String> keySelectory = s -> s.substring(0,1);
Func1<String,String> elementSelectory = s -> s;
Func1<Action1<String>, Map<String, String>> mapFactory =
action ->
CacheBuilder.newBuilder()
.maximumSize(1)
.removalListener(notification ->
action.call(notification.getKey()))
.<String, String> build().asMap();
items.groupBy(keySelector, elementSelector, mapFactory)
.flatMap(grp -> grp.toList())
.subscribe(System.out::println);
这类问题最好使用自定义操作符来解决——依赖于状态的转换(这里是已经处理的项及其键)不是响应式方法的最佳目标,通常需要Subjects
。使用内置操作符,冷可观察对象的(详细)解决方案如下:
public static <K, T> Observable.Transformer<T, GroupedObservable<K, T>> groupByUntilChanged(Func1<? super T, ? extends K> keyExtractor) {
return observable -> groupByUntilChanged(observable, keyExtractor);
}
static <K, T> Observable<GroupedObservable<K, T>> groupByUntilChanged(Observable<T> itemsStream,
Func1<? super T, ? extends K> keyExtractor) {
/*keys according to keyExtractor */
Observable<K> keysStream = itemsStream.distinctUntilChanged(keyExtractor).map(keyExtractor);
Func1<K, Func1<T, Boolean>> itemByKey = key -> item -> key.equals(keyExtractor.call(item));
/*predicate functions to match sub stream specified by key extractor*/
Observable<Func1<T, Boolean>> itemsByKeyFuncStream = keysStream.map(itemByKey);
/*stream chunks are processed sequentially, some kind of state bookkeeping is needed: let it be the number of
already processed items */
BehaviorSubject<Integer> skipCountStream = BehaviorSubject.create(0);
Observable<GroupedObservable<K, T>> groupByUntilChangedStream = itemsByKeyFuncStream.concatMap(takeF ->
/*skip already processed items, take while key extractor predicate is true*/
skipCountStream.first().map(count -> itemsStream.skip(count).takeWhile(takeF)))
.doOnNext(subItems ->
/*once group is ready, increase number of already processed items*/
subItems.count()
.flatMap(subItemsSize -> skipCountStream.first().map(allSize -> allSize + subItemsSize))
.subscribe(skipCountStream::onNext))
/*convert to GroupedObservable*/
.zipWith(keysStream, (obs, key) -> GroupedObservable.from(key, obs));
return groupByUntilChangedStream;
}
用
进行测试Observable<String> itemsStream =
Observable.just("Alpha", "Adam", "Apple", "Beta", "Brick", "Bridge", "Bat", "Gamma", "Gorilla", "Axe");
Func1<String, String> keyExtractor = s -> s.substring(0, 1);
Observable<GroupedObservable<String, String>> groupByUntilChangedStream = itemsStream.compose(groupByUntilChanged(keyExtractor));
/*groups starting with "A"*/
groupByUntilChangedStream
.filter(grouped -> grouped.getKey().equals("A"))
.flatMap(Observable::toList)
.defaultIfEmpty(Collections.emptyList())
.subscribe(System.out::println);
结果
[Alpha, Adam, Apple]
[Axe]