我对RxJava, android和Java都很陌生…我试图创建一个发出同步列表的可观察对象。我哪里错了?
public class CurrentLocationHolder {
private List<LocationPoint> locationBuffer = Collections.synchronizedList(new ArrayList<>());
public final PublishSubject<List<LocationPoint>> locationBufferChanged = PublishSubject.create();
public Observable<List<LocationPoint>> observeLocationBufferChanged(boolean emitCurrentValue) {
return emitCurrentValue ? locationBufferChanged.startWith(locationPointsBuffer) : locationBufferChanged;
}
public void setLocation(LocationPoint point) {
locationBuffer.add(point);
if (locationBuffer.size() >= 10) {
locationBufferChanged.onNext(this.locationBuffer);
}
locationBufferChanged.onCompleted();
locationBuffer.clear();
}
}
这是我的订阅者#1对象:
public class DatabaseManager {
private Subscription locationBufferSubscription;
private static DatabaseManager instance;
public static void InitInstance() {
if (instance == null) {
instance = new DatabaseManager();
instance.changeLocationBufferSubscription =
CurrentLocationHolder.getInstance().observeLocationBufferChanged()
.subscribe(locArray -> {
ActiveAndroid.beginTransaction();
try {
for (int i = 0; i < locArray.size(); i++) {
locArray.get(i).save();
}
ActiveAndroid.setTransactionSuccessful();
} finally {
ActiveAndroid.endTransaction();
}
});
}
}
}
所以,如果我要创建另一个订阅者,例如HttpManager,它也将监听缓冲区的变化,我的缓冲区将是同步的所有监听器吗?在所有监听器处理了所有10个LocationPoints之后,我的列表会被清除吗?我的代码是不是有点过头了?
所以,答案是复制locationBuffer到新的ArrayList时调用onNext的主题:
if (locationBuffer.size() >= 10) {
locationBufferChanged.onNext(new ArrayList<>(this.locationBuffer));
}
locationBuffer.clear();