我正在接收一个无限的事件流,比如Observable<Event>
,其中Event(userId, payload)
。我必须提供功能,允许订阅用户的事件。Observable<Event>
是一个热门资源,但这并不重要,因为如果没有感兴趣的用户,我可以放弃事件。我正在尝试使用RxJava API以纯流方式实现它,但这对我来说似乎很难。我试了什么:
- 使用内置操作符,以最少的努力完成。
Observable<Event> events =; //;
events = events.publish().refCount();
Observable<Event> subscribeUser(int userId) {
return events.filter(it -> it.userId == userId);
}
工作很好,问题是它只是慢为我的情况-我不需要多播到所有订阅者,然后过滤,如果它不属于任何用户。想象一下,每秒有成千上万的用户和1000个事件——0 (n)的复杂度对我来说太大了——我认为我需要一些哈希。
PublishSubject
ConcurrentHashMap<Integer, PublishSubject<Event>> subscriptions = new ConcurrentHashMap<>();
Observable<Event> events =; //
events.subscribe(event -> /*pushing on user subject*/);
Observable<Event> subscribeUser(int userId) {
return subscriptions.putIfAbsent(userId, PublishSubject.create())
.doOnTerminate(() -> subscriptions.remove(userId))
}
这需要更多的编码(比上面的例子多得多)来管理订阅,它不是纯流,PublishSubject
也是为多个订阅者设计的,所以它需要更多的空间用于内部数组,我不需要,因为我每个主题只有一个用户。
groupBy()
操作符我可以通过userId生成GroupedObservable
流分组事件,然后我可以只.ignoreElements()
,直到没有订阅,但我不知道如何将这两个连接在一起。一方面,我有GroupedObservable<Integer, Event>
的Observable
流,然后是传入的Observable<Event> subscribeUser(int userId)
请求。我怎么能说:"嘿,从现在开始,不要因为有人对它感兴趣而删除元素"?并返回GroupedObservable
作为请求的响应?custom
Observable
这里的问题是Observable.create(subscriber -> )
订阅者没有关于它的键(userId)的信息-我也没有控制subscribe()
方法,所以我不能在那里传递我的自定义实现并在转换或以某种方式检索键。这也是不好的,因为我需要自己管理状态。
lift()
自定义操作符与4类似的问题,甚至更糟,因为我无法控制在这里创建Observable
,所以我还需要防止多个连接到源
选项2)是最直接的。选项4在编码Observable
时需要更聪明。
ConcurrentMap<Integer, Observer<Event>> observers = ...
void signal(Event event) {
var observer = observers.get(event.userId);
if (observer != null) {
observer.onNext(event);
}
}
Observable<Event> observeUser(int userId) {
return Observable.unsafeCreate(observer -> {
var remove = Disposable.fromAction(() -> observers.remove(userId));
subscriber.onSubscribe(remove);
observers.put(userId, observer);
});
}