使用Observable和take运算符读取文件时出现RxJava问题



我的工作环境是JDK 1.6和RxJava 2

我想制作一个Observable,它发出一个项目,该项目是通过BufferedReader读取的文件行字符串,如下所示:

...
Observable<String> fileLineObservable = Observable.defer(new Callable<String>(){
return new ObservableSource<String> call() throws Exception {
return new ObservableSource<String>() {
public void subscribe(Observer<String> observer) {
BufferedReader reader = null;
try {
reader = new BufferedReader(new FileReader(filePath));
String line = null;
while ((line = reader.readLine()) != null) {
observer.onNext(line);
}
observer.onComplete();
... catching exception and close reader
}
}
}
}
});

我还想让一个观察者用一个take(count(操作符观察上面的Observable,如下所示:

fileLineObservable.take(2)
.subscribe(new Consumer<String>() {
public void onNext(String line) {
... do something with the file line string
}
});

我在执行上面的代码时遇到了NullPointerException,我知道为什么。引起NPE的原因是onNext的第二次调用导致在TakeObserver实例上执行onComplete,并且在onComplete方法内部,调用未设置(null(的upstream.dispose。TakeObserver的上游变量应该在订阅Observable时设置onSubscribe(一次性使用(。

我该如何解决这个问题?我应该实现我自己的Disposable类来设置TakeObserver的上游吗?

这个解决方案怎么样?

Observable<String> observableFile2(Path path) {
return Observable.using(
() -> Files.newBufferedReader(path),
reader -> {
return Observable.fromIterable(() -> {
return new Iterator<>() {
private String nextLine = null;
@Override
public boolean hasNext() {
try {
nextLine = reader.readLine();
return nextLine != null;
} catch (Exception ex) {
return false;
}
}
@Override
public String next() {
if (nextLine != null) {
return nextLine;
}
throw new IllegalStateException("nextLine can not be null.");
}
};
});
},
BufferedReader::close
);
}
  • Observable#using确保BufferedReader在一次性/onError上正确关闭
  • Observable#fromIterable为我们包装了readLine调用和onComplete处理

测试

testImplementation("org.junit.jupiter:junit-jupiter-api:5.6.2")
testRuntimeOnly("org.junit.platform:junit-platform-launcher:1.6.2")
testRuntimeOnly("org.junit.jupiter:junit-jupiter-engine:5.6.2")
testRuntimeOnly("org.junit.vintage:junit-vintage-engine:5.6.2")
testImplementation("com.google.jimfs:jimfs:1.1")

测试

@Test
void name() {
observableFile2(hello).take(2)
.test()
.assertValues("line0", "line1")
.assertComplete();
}
@Test
void name2() {
observableFile2(hello).take(10)
.test()
.assertValues("line0", "line1", "line2", "line3")
.assertComplete();
}
@Test
void name3() {
observableFile2(hello2)
.test()
.assertComplete();
}

最新更新