创建GroupByUntilChanged操作符



我正在尝试创建自己的RxJava操作符,称为groupByUntilChanged()。它的功能类似于groupBy(),但假设排放是基于密钥的顺序。因此,当键值改变时,它完成了GroupedObservable,并为下一个键移动到下一个GroupedObservable

这是我到目前为止的工作。我使用每个String的第一个字母作为密钥。这似乎工作得很好,直到我扔一个"A"String在结束。

public class Test {
    public static void main(String[] args) {
        Observable<String> items =
                Observable.just("Alpha","Adam","Apple","Beta","Brick","Bridge","Bat","Gamma","Gorilla","Axe");
        Func1<String,String> keyExtractor = s -> s.substring(0,1);
        items.compose(orderedGroupBy(keyExtractor))
                .flatMap(grp -> grp.toList())
                .subscribe(System.out::println);
    }
    public static <T,K> Observable.Transformer<T,GroupedObservable<K,T>> orderedGroupBy(Func1<T,K> keySelector) {
        return obs -> obs.groupBy(keySelector)
                .map(grp ->
                      GroupedObservable.from(grp.getKey(),grp.takeWhile(t -> keySelector.call(t).equals(grp.getKey())))
                );
    }
}

我得到这样的输出:

[Alpha, Adam, Apple, Axe]
[Beta, Brick, Bridge, Bat]
[Gamma, Gorilla]

当我真的想要这个:

[Alpha, Adam, Apple]
[Beta, Brick, Bridge, Bat]
[Gamma, Gorilla]
[Axe]

我该怎么做才能使一组有序的排放在键变化时将onComplete()转换为GroupedObservable ?

groupBy操作符协调GroupedObservable的完成是一件相当棘手的事情(尽管在您的情况下,同步处理可能会启用其他解决方案)。出于这个原因,groupBy有一个重载,允许您指定mapFactory。如果你使用Guava CacheBuilder作为groupBy重载的javadoc,那么你可以为地图指定一个最大大小为1和你想要的行为结果:

Func1<String,String> keySelectory = s -> s.substring(0,1);
Func1<String,String> elementSelectory = s -> s;
Func1<Action1<String>, Map<String, String>> mapFactory =
   action -> 
     CacheBuilder.newBuilder()
       .maximumSize(1)
       .removalListener(notification ->
          action.call(notification.getKey()))
     .<String, String> build().asMap();
items.groupBy(keySelector, elementSelector, mapFactory)
            .flatMap(grp -> grp.toList())
            .subscribe(System.out::println);

这类问题最好使用自定义操作符来解决——依赖于状态的转换(这里是已经处理的项及其键)不是响应式方法的最佳目标,通常需要Subjects。使用内置操作符,冷可观察对象的(详细)解决方案如下:

public static <K, T> Observable.Transformer<T, GroupedObservable<K, T>> groupByUntilChanged(Func1<? super T, ? extends K> keyExtractor) {
    return observable -> groupByUntilChanged(observable, keyExtractor);
}
static <K, T> Observable<GroupedObservable<K, T>> groupByUntilChanged(Observable<T> itemsStream,
                                                                      Func1<? super T, ? extends K> keyExtractor) {
    /*keys according to keyExtractor */
    Observable<K> keysStream = itemsStream.distinctUntilChanged(keyExtractor).map(keyExtractor);
    Func1<K, Func1<T, Boolean>> itemByKey = key -> item -> key.equals(keyExtractor.call(item));
    /*predicate functions to match sub stream specified by key extractor*/
    Observable<Func1<T, Boolean>> itemsByKeyFuncStream = keysStream.map(itemByKey);
    /*stream chunks are processed sequentially, some kind of state bookkeeping is needed: let it be the number of
      already processed items */
    BehaviorSubject<Integer> skipCountStream = BehaviorSubject.create(0);
    Observable<GroupedObservable<K, T>> groupByUntilChangedStream = itemsByKeyFuncStream.concatMap(takeF ->
            /*skip already processed items, take while key extractor predicate is true*/
            skipCountStream.first().map(count -> itemsStream.skip(count).takeWhile(takeF)))
            .doOnNext(subItems ->
                    /*once group is ready, increase number of already processed items*/
                    subItems.count()
                            .flatMap(subItemsSize -> skipCountStream.first().map(allSize -> allSize + subItemsSize))
                            .subscribe(skipCountStream::onNext))
             /*convert to GroupedObservable*/
            .zipWith(keysStream, (obs, key) -> GroupedObservable.from(key, obs));
    return groupByUntilChangedStream;
}

进行测试
Observable<String> itemsStream =
            Observable.just("Alpha", "Adam", "Apple", "Beta", "Brick", "Bridge", "Bat", "Gamma", "Gorilla", "Axe");
    Func1<String, String> keyExtractor = s -> s.substring(0, 1);
    Observable<GroupedObservable<String, String>> groupByUntilChangedStream = itemsStream.compose(groupByUntilChanged(keyExtractor));
    /*groups starting with "A"*/
    groupByUntilChangedStream
            .filter(grouped -> grouped.getKey().equals("A"))
            .flatMap(Observable::toList)
            .defaultIfEmpty(Collections.emptyList())
            .subscribe(System.out::println);

结果

[Alpha, Adam, Apple]
[Axe]

相关内容

  • 没有找到相关文章

最新更新