我正在尝试实现一个使用可观察量发出更改的类。当订阅完成此可观察量时,我想发送一个启动/初始化事件。然后我想发送通常的事件。
例如。假设我有两个不同的订阅者 A 和 B。A 和 B 在不同的时间开始订阅。如果 MyClass.getChanges(( 发出事件编号 1、2、3、4 和 5。
如果 A 在事件 1,2 之间启动订阅,则它应收到以下事件:初始事件, 2, 3, 4, 5.
如果 B 在事件 4 和 5 之间启动订阅,则 B 应收到以下事件:初始事件, 5.
如何使用RxJava来做到这一点?
谢谢!
编辑 1
我想我需要解释一下,每次发出"初始事件"时都是不同的。每次有新订阅者开始从getChanged((订阅时,它由MyClass计算。
我的场景是MyClass包含一个列表。"初始事件"包含订阅完成时的列表。然后,对这个列表的每个更改都是从getChanges((发出的。
很抱歉在 2 年后发布这个,但我有同样的需求,发现这个问题没有得到解答。
我所做的如下:
public Observable<Event> observe() {
return Observable.defer(() ->
subject.startWith(createInitialEvent())
);
}
思路如下:
- defer(( 在观察者订阅由方法 observe(( 返回的 Observable 时执行传入的 lambda 表达式。所以基本上,它执行 subject.startWith(...(,它返回一个 Observable,它是订阅者的实际事件源。
- subject.startWith(...( 发出一个初始事件(由 startWith(...( 指定(,后跟主题发出的事件。
所以,如果我回到原来的帖子:如果观察者在事件 1,2 之间启动订阅,则它应收到以下事件:初始事件、2、3、4、5。
您要查找的是PublishSubject
。受试者Observables
很热门,因为他们不会等到Observers
subscribe
他们才开始发射他们的物品。 这里有一些关于主题的信息。
以下是您的用例的快速演示
PublishSubject<String> subject = PublishSubject.create();
Observable<String> InitEvent = Observable.just("init");
Observable<String> A = subject.asObservable();
Observable<String> B = subject.asObservable();
subject.onNext("1");
A.startWith(InitEvent)
.subscribe(s -> System.out.println("A: " + s));
subject.onNext("2");
subject.onNext("3");
subject.onNext("4");
B.startWith(InitEvent)
.subscribe(s -> System.out.println("B: " + s));
subject.onNext("5");
可能不是很优雅的方式,只使用标志怎么样?看起来您只想替换第一个发出的事件。
例如,对于一个订阅,以下逻辑:
boolean firstTimeA = true;
myCustomObservable.subscribe(s -> {
System.out.println(firstTimeA ? "initEvent" : s.toString());
if(firstTimeA) firstTimeA = false;
});
由于您想要拥有第二个订阅,只需创建一个firstTimeB
并将其更新为您的 B 订阅即可。
如果我明白你在问什么,这样的事情应该对你有用
int last = 0;
Observable obs;
List<Integer> list = new ArrayList<>();
public SimpleListObservable() {
obs = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
while(last < 30) {
last++;
list.add(last);
subscriber.onNext(last);
}
subscriber.onCompleted();
}
});
}
public Observable<Integer> process() {
return Observable.from(list).concatWith(obs);
}
当源可观察量收集值时,它们被添加到List
中(您可以根据需要转换项目,过滤掉它们等(,然后当ObserverB
订阅时,它将重播已在List
中收集的项目,然后继续源可观察输出。
这个简单的测试应该证明结果
public void testSequenceNext() {
final SimpleListObservable obs = new SimpleListObservable();
final Observer<Integer> ob2 = Mockito.mock(Observer.class);
obs.process().subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
ob1Complete = true;
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onNext(Integer integer) {
System.out.println("ob1: " + integer);
if (integer == 20) {
obs.process().subscribe(ob2);
}
}
});
ArgumentCaptor<Integer> captor = ArgumentCaptor.forClass(Integer.class);
Mockito.verify(ob2, Mockito.times(30)).onNext(captor.capture());
for (Integer value : captor.getAllValues()) {
System.out.println(value);
}
}
你怎么看这个,我当然已经做了我的API的一部分,因为我在手机上:
public class StreamOfSomething {
new StreamOfSomething() {
// source of events like
events = Observable.range(0, 1_000_000_000)
.doOnNext(set::add) // some operation there
.map(Event::change)
.publish()
.refCount();
}
public Observable<Event> observeChanges() {
return events.startWith(
Observable.just(Event.snapshot(set))); // start stream with generated event
}
}
客户端可以执行以下操作:
Observable.timer(2, 4, TimeUnit.SECONDS)
.limit(2)
.flatMap(t -> theSourceToWatch.observeChanges().limit(10))
.subscribe(System.out::println);
但是请注意,如果您处于多线程环境中,则可能需要在订阅时进行同步以阻止任何修改,否则列表可能会在发出之前更改。或者完全围绕可观察量重新设计这个类,但我还不知道如何实现这一点。