假设我们有一个合作游戏,玩家获得和失去得分点。任何玩家都可以在任何时候加入游戏。一旦加入,玩家就不会离开(为了简单)。球队的总得分只是每个球员在任何时刻的得分点数的总和。玩家应该能够看到自己的当前得分和球队的总得分。
所以,我们有一个所有球员的分数绑定到时间戳的流。
Time: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
Stream: A5 A4 B2 A3 A4B4 A9 K5 B6 A8 B10K9 A12
字母代表球员,数字代表得分。像A4B4
这样的值意味着两个玩家的得分同时发生了变化。在groupBy
之后,我们有三个流:每个玩家一个。注意,玩家的集合不是预定义的。
Time: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
Alex: 5 4 3 4 9 8 12
Ben: 2 4 6 10
Katie: 5 9
我期望得到的是每次分数变化的总和流:
Time: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
Total: 5 4 6 5 8 13 18 20 19 27 31
,微不足道。但是解决方案变得复杂了。
这是一个输入流。
Observable< Record > input = Observable.from( new Record[] {
new Record( "Alex", 0, 5 ),
new Record( "Alex", 1, 4 ),
new Record( "Ben", 5, 2 ),
new Record( "Alex", 6, 3 ),
new Record( "Alex", 7, 4 ),
new Record( "Ben", 7, 4 ),
new Record( "Alex", 8, 9 ),
new Record( "Katie", 12, 5 ),
new Record( "Ben", 14, 6 ),
new Record( "Alex", 15, 8 ),
new Record( "Katie", 16, 9 ),
new Record( "Ben", 16, 10 ),
new Record( "Alex", 17, 12 )
} )
.delay( e -> Observable.interval( e.timestamp, TimeUnit.SECONDS, scheduler ) )
.doOnCompleted( latch::countDown )
.share();
记录是一个简单的元组:Record( String player, int timestamp, double score )
.
让我们把它分成几组:
Observable< Observable< Record > > output = input
.groupBy( e -> e.player )
.map( g -> g.cache() )
一个标准的toList
方法期望一个Observable完成。但是只有当输入流完成时,群组流才会完成(任何玩家都可以在任何时候加入游戏)。所以需要一个新的变压器。它在onNext
调用时返回一个列表,而不是onComplete
。
.compose( new ToListOnNextTransformer() )
然后,每当一个新的组被发出时(每次有新的玩家加入游戏时),我们都会生成一个新的Observable。这个Observable返回所有玩家最近得分的总和。
.map( eachPlayerRecordsList -> Observable.combineLatest( eachPlayerRecordsList, ( Object... eachPlayerRecordsArray ) -> {
double total = Arrays
.stream( eachPlayerRecordsArray )
.mapToDouble( e -> ( ( Record )e ).score )
.sum();
int timestamp = Arrays
.stream( eachPlayerRecordsArray )
.mapToInt( e -> ( ( Record )e ).timestamp )
.max()
.getAsInt();
return new Record( "Total", timestamp, total );
} ) );
最后,我们输出第一组的值,直到第二个玩家加入游戏。然后我们输出第二组的值,直到第三个玩家加入等等。这正是switchOnNext
做的。
Observable.switchOnNext( outputg )
.subscribe( System.out::println, e -> { ( ( Throwable )e ).printStackTrace(); } );
自定义变压器类:
private static class ToListOnNextTransformer implements Observable.Transformer< Observable< Record >, List< Observable< Record > > > {
private final List< Observable< Record > > list = new ArrayList<>();
private final PublishSubject< List< Observable< Record > > > ret = PublishSubject.create();
@Override
public Observable< List< Observable< Record > > > call( Observable< Observable< Record > > eachPlayerRecords ) {
eachPlayerRecords.subscribe( playerRecords -> {
list.add( playerRecords );
ret.onNext( new ArrayList<>( list ) );
} );
return ret;
}
}
问题是:
- 可以简化吗? 我是不是打破了RxJava的一些规则?比如在订阅者内部使用订阅者等等。
这是一个人为的任务。我正在学习RxJava,并试图理解响应式编程。我希望有更好的方法,我只是遗漏了一些明显的东西。
测试文件的完整源代码
我不完全理解您的示例代码,但您可以通过scan
:
source.scan(0, (a, b) -> a + b)
给定一个人物和他们的分数的事件流,你可以创建两个组:
ConnectableObservable<Record> co = input.publish(); // replay, cache, etc.
Observable<Pair<String, Integer>> userScores = co
.groupBy(r -> r.name)
.flatMap(g -> just(Pair.of(g.getKey(), g.scan(0, (a, b) -> a + b))));
Observable<Pair<String, Integer>> teamScores = co
.groupBy(r -> r.team)
.flatMap(g -> just(Pair.of(g.getKey(), g.scan(0, (a, b) -> a + b))));
userScores.filter(p -> p.name.equals("Alex"))
.concatMap(p -> p.second.map(s -> Pair.of(p.first, s)))
.subscribe(System.out::println);
co.connect();