不需要创建JavaRx可观察对象,向所有订阅者发送同步(或阻塞)列表



我对RxJava, android和Java都很陌生…我试图创建一个发出同步列表的可观察对象。我哪里错了?

public class CurrentLocationHolder {
    private List<LocationPoint> locationBuffer = Collections.synchronizedList(new ArrayList<>());
    public final PublishSubject<List<LocationPoint>> locationBufferChanged = PublishSubject.create();
    public Observable<List<LocationPoint>> observeLocationBufferChanged(boolean emitCurrentValue) {
        return emitCurrentValue ? locationBufferChanged.startWith(locationPointsBuffer) : locationBufferChanged;
    }
    public void setLocation(LocationPoint point) {
        locationBuffer.add(point);
        if (locationBuffer.size() >= 10) {
            locationBufferChanged.onNext(this.locationBuffer);
        }
        locationBufferChanged.onCompleted();
        locationBuffer.clear();
    }
}

这是我的订阅者#1对象:

public class DatabaseManager {
    private Subscription locationBufferSubscription;
    private static DatabaseManager instance;
    public static void InitInstance() {
        if (instance == null) {
            instance = new DatabaseManager();
        instance.changeLocationBufferSubscription = 
            CurrentLocationHolder.getInstance().observeLocationBufferChanged()
                .subscribe(locArray -> {
                    ActiveAndroid.beginTransaction();
                    try {
                        for (int i = 0; i < locArray.size(); i++) {
                            locArray.get(i).save();
                        }
                        ActiveAndroid.setTransactionSuccessful();
                    } finally {
                        ActiveAndroid.endTransaction();
                    }
            });
        }
    }
}

所以,如果我要创建另一个订阅者,例如HttpManager,它也将监听缓冲区的变化,我的缓冲区将是同步的所有监听器吗?在所有监听器处理了所有10个LocationPoints之后,我的列表会被清除吗?我的代码是不是有点过头了?

所以,答案是复制locationBuffer到新的ArrayList时调用onNext的主题:

if (locationBuffer.size() >= 10) {
    locationBufferChanged.onNext(new ArrayList<>(this.locationBuffer));
}
locationBuffer.clear();

最新更新