我写了一小段代码,它接受一个参数ObjectInputStream并观察套接字上的数据。一旦readObject()函数返回"null",并且由于函数observeSocket(ObjectInputStream in)只接受Object,订阅者执行onError()函数并终止程序,就会引发问题。
但是我需要的是继续观察对象的套接字,只有在套接字上观察到对象时才返回,只有当观察者取消订阅时才应该终止函数的功能。如何修改代码以实现所需的功能?
public Observable<Object> observeSocket(ObjectInputStream in){
return Observable.create(subscriber -> {
while(!subscriber.isUnsubscribed()) {
subscriber.onNext(getData(in));
}
subscriber.onCompleted();
});
}
public Object getData(ObjectInputStream in){
Object streamData = null;
try{
streamData = in.readObject();
}
catch(IOException e){
//e.printStackTrace();
}
catch(ClassNotFoundException e){
e.printStackTrace();
}
return streamData;
}
避免使用Observable.create(OnSubscribe)
,因为让可观察对象感知反压并符合契约是一件棘手的事情。这是使用Observable.create(SyncOnSubscribe)
的一个很好的候选:
ObjectInputStream ois = ...;
Observable<Object> objects =
Observable.create(
SyncOnSubscribe.createStateless(observer -> {
try {
Object value = ois.readObject();
// you decide how end of file is indicated
// a common strategy is to write a null object
// to the end of the Object stream.
if (value == END_OF_FILE) {
observer.onCompleted();
} else {
observer.onNext(value);
}
} catch (Exception e) {
observer.onError(e);
}
}));