如何在Rxjava中观察套接字直到取消订阅



我写了一小段代码,它接受一个参数ObjectInputStream并观察套接字上的数据。一旦readObject()函数返回"null",并且由于函数observeSocket(ObjectInputStream in)只接受Object,订阅者执行onError()函数并终止程序,就会引发问题。

但是我需要的是继续观察对象的套接字,只有在套接字上观察到对象时才返回,只有当观察者取消订阅时才应该终止函数的功能。如何修改代码以实现所需的功能?

public Observable<Object> observeSocket(ObjectInputStream in){
    return Observable.create(subscriber -> {
        while(!subscriber.isUnsubscribed()) {
            subscriber.onNext(getData(in));
        }
        subscriber.onCompleted();
    });
}
public Object getData(ObjectInputStream in){
    Object streamData = null;
    try{
        streamData = in.readObject();
    }
    catch(IOException e){
        //e.printStackTrace();
    }
    catch(ClassNotFoundException e){
        e.printStackTrace();
    }
    return streamData;
}

避免使用Observable.create(OnSubscribe),因为让可观察对象感知反压并符合契约是一件棘手的事情。这是使用Observable.create(SyncOnSubscribe)的一个很好的候选:

ObjectInputStream ois = ...;
Observable<Object> objects = 
  Observable.create(
    SyncOnSubscribe.createStateless(observer -> {
      try {
          Object value = ois.readObject();
          // you decide how end of file is indicated
          // a common strategy is to write a null object
          // to the end of the Object stream.
          if (value == END_OF_FILE) {
              observer.onCompleted();
          } else {
              observer.onNext(value);
          }
      } catch (Exception e) {
          observer.onError(e);
      }
    }));          

相关内容

  • 没有找到相关文章

最新更新