RxJava.订阅开始时的初始下一页



我正在尝试实现一个使用可观察量发出更改的类。当订阅完成此可观察量时,我想发送一个启动/初始化事件。然后我想发送通常的事件。

例如。假设我有两个不同的订阅者 A 和 B。A 和 B 在不同的时间开始订阅。如果 MyClass.getChanges(( 发出事件编号 1、2、3、4 和 5。

如果 A 在事件 1,2 之间启动订阅,则它应收到以下事件:初始事件, 2, 3, 4, 5.

如果 B 在事件 4 和 5 之间启动订阅,则 B 应收到以下事件:初始事件, 5.

如何使用RxJava来做到这一点?

谢谢!

编辑 1

我想我需要解释一下,每次发出"初始事件"时都是不同的。每次有新订阅者开始从getChanged((订阅时,它由MyClass计算。

我的场景是MyClass包含一个列表。"初始事件"包含订阅完成时的列表。然后,对这个列表的每个更改都是从getChanges((发出的。

很抱歉在 2 年后发布这个,但我有同样的需求,发现这个问题没有得到解答。

我所做的如下:

public Observable<Event> observe() {
    return Observable.defer(() -> 
        subject.startWith(createInitialEvent())
    );
}

思路如下:

  • defer(( 在观察者订阅由方法 observe(( 返回的 Observable 时执行传入的 lambda 表达式。所以基本上,它执行 subject.startWith(...(,它返回一个 Observable,它是订阅者的实际事件源。
  • subject.startWith(...( 发出一个初始事件(由 startWith(...( 指定(,后跟主题发出的事件。

所以,如果我回到原来的帖子:如果观察者在事件 1,2 之间启动订阅,则它应收到以下事件:初始事件、2、3、4、5。

您要查找的是PublishSubject。受试者Observables很热门,因为他们不会等到Observers subscribe他们才开始发射他们的物品。 这里有一些关于主题的信息。

以下是您的用例的快速演示

    PublishSubject<String> subject = PublishSubject.create();
    Observable<String> InitEvent = Observable.just("init");
    Observable<String> A = subject.asObservable();
    Observable<String> B = subject.asObservable();
    subject.onNext("1");
    A.startWith(InitEvent)
            .subscribe(s -> System.out.println("A: " + s));
    subject.onNext("2");
    subject.onNext("3");
    subject.onNext("4");
    B.startWith(InitEvent)
            .subscribe(s -> System.out.println("B: " + s));
    subject.onNext("5");

可能不是很优雅的方式,只使用标志怎么样?看起来您只想替换第一个发出的事件。

例如,对于一个订阅,以下逻辑:

boolean firstTimeA = true;
myCustomObservable.subscribe(s -> {
   System.out.println(firstTimeA ? "initEvent" : s.toString());
   if(firstTimeA) firstTimeA = false;
});

由于您想要拥有第二个订阅,只需创建一个firstTimeB并将其更新为您的 B 订阅即可。

如果我明白你在问什么,这样的事情应该对你有用

int last = 0;
Observable obs;
List<Integer> list = new ArrayList<>();
public SimpleListObservable() {
    obs = Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            while(last < 30) {
                last++;
                list.add(last);
                subscriber.onNext(last);
            }
            subscriber.onCompleted();
        }
    });
}
public Observable<Integer> process() {
    return Observable.from(list).concatWith(obs);
}

当源可观察量收集值时,它们被添加到List中(您可以根据需要转换项目,过滤掉它们等(,然后当ObserverB订阅时,它将重播已在List中收集的项目,然后继续源可观察输出。

这个简单的测试应该证明结果

public void testSequenceNext() {
    final SimpleListObservable obs = new SimpleListObservable();
    final Observer<Integer> ob2 = Mockito.mock(Observer.class);
    obs.process().subscribe(new Observer<Integer>() {
        @Override
        public void onCompleted() {
            ob1Complete = true;
        }
        @Override
        public void onError(Throwable e) {
            e.printStackTrace();
        }
        @Override
        public void onNext(Integer integer) {
            System.out.println("ob1: " + integer);
            if (integer == 20) {
                obs.process().subscribe(ob2);
            }
        }
    });
    ArgumentCaptor<Integer> captor = ArgumentCaptor.forClass(Integer.class);
    Mockito.verify(ob2, Mockito.times(30)).onNext(captor.capture());
    for (Integer value : captor.getAllValues()) {
        System.out.println(value);
    }
}

你怎么看这个,我当然已经做了我的API的一部分,因为我在手机上:

public class StreamOfSomething {
    new StreamOfSomething() {
        // source of events like
        events = Observable.range(0, 1_000_000_000)
                           .doOnNext(set::add) // some operation there
                           .map(Event::change)
                           .publish()
                           .refCount();
    }
    public Observable<Event> observeChanges() {
        return events.startWith(
                 Observable.just(Event.snapshot(set))); // start stream with generated event
    }
}

客户端可以执行以下操作:

Observable.timer(2, 4, TimeUnit.SECONDS)
          .limit(2)
          .flatMap(t -> theSourceToWatch.observeChanges().limit(10))
          .subscribe(System.out::println);

但是请注意,如果您处于多线程环境中,则可能需要在订阅时进行同步以阻止任何修改,否则列表可能会在发出之前更改。或者完全围绕可观察量重新设计这个类,但我还不知道如何实现这一点。

最新更新