使用 Kotlin 的 RxJava - 如何同步 2 个异步方法,从 Java 重构



我有 2 个集合,用于缓冲位置更新事件:

     private List<LocationGeoEvent> mUpdateGeoEvents = new ArrayList<>();
    private List<LocationRSSIEvent> mUpdateRSSIEvents = new ArrayList<>();

我的代码中也存在:

        private final ScheduledExecutorService mSaveDataExecutor = Executors.newSingleThreadScheduledExecutor();
    private boolean mSaveDataScheduled;
    private final Object mEventsMonitor = new Object();
    private ScheduledFuture<?> mScheduledStopLocationUpdatesFuture;
    private final ScheduledExecutorService mStopLocationUpdatesExecutor = Executors.newSingleThreadScheduledExecutor();

我像这样将事件添加到此集合中:

    public void appendGeoEvent(LocationGeoEvent event) {
            synchronized (mEventsMonitor) {
                mUpdateGeoEvents.add(event);
                scheduleSaveEvents();
            }
    }

RSSI 事件也是如此

现在,计划保存事件方法如下所示:

      private void scheduleSaveEvents() {
        synchronized (mSaveDataExecutor) {
            if (!mSaveDataScheduled) {
                mSaveDataScheduled = true;
                mSaveDataExecutor.schedule(
                        new Runnable() {
                            @Override
                            public void run() {
                                synchronized (mSaveDataExecutor) {
                                    saveEvents(false);
                                    mSaveDataScheduled = false;
                                }
                            }
                        },
                        30,
                        TimeUnit.SECONDS);
            }
        }
    }

问题是,我需要同步停止更新的另一种方法。它是这样触发的:

      private void scheduleStopLocationUpdates() {
        synchronized (mStopLocationUpdatesExecutor) {
            if (mScheduledStopLocationUpdatesFuture != null)
                mScheduledStopLocationUpdatesFuture.cancel(true);
            mScheduledStopLocationUpdatesFuture = mStopLocationUpdatesExecutor.schedule(
                    new Runnable() {
                        @Override
                        public void run() {
                            synchronized (mStopLocationUpdatesExecutor) {
                                stopLocationUpdates();
                                saveEvents(true);
                                cleanAllReadingsData();
                            }
                        }
                    },
                    45,
                    TimeUnit.SECONDS);
        }
    }

在保存事件方法中,我这样做:

    private void saveEvents(boolean locationUpdatesAboutToStop) {
        synchronized (mEventsMonitor) {
            if (mUpdateGeoEvents.size() > 0 || mUpdateRSSIEvents.size() > 0) {
                 //do something with the data from buffered collection arrayLists and with the boolean locationUpdatesAboutToStop
                mUpdateGeoEvents.clear();
                mUpdateRSSIEvents.clear();
            }
        }
    }

有没有办法使用 Kotlin 将这个简化器重构为 RxJava?

更新

这是我的附录RSSIevents方法:

    private fun appendRSSIEvent(event: LocationRSSIEvent) {
    synchronized(mEventsMonitor) {
        if (!shouldSkipRSSIData(event.nexoIdentifier)) {
            mUpdateRSSIEvents.add(event)
            acknowledgeDevice(event.nexoIdentifier)
            scheduleSaveEvents()
            startLocationUpdates()
        } else
            removeExpiredData()
    }
}

您可以缓冲两个数据流,然后将它们组合起来进行保存。此外,还可以使用缓冲区触发器停止更新。

PublishSubject<LocationGeoEvent> mUpdateGeoEventsSubject = PublishSubject.create();
PublishSubject<LocationRSSIEvent> mUpdateRSSIEventsSubject = PublishSubject.create();
public void appendGeoEvent(LocationGeoEvent event) {
  mUpdateGeoEventsSubject.onNext( event );
  triggerSave.onNext( Boolean.TRUE );
}

RSS 提要也是如此。

现在我们需要用于驱动保存步骤的触发器。

PublishSubject<Boolean> triggerSave = PublishSubject.create();
PublishSubject<Boolean> triggerStopAndSave = PublishSubject.create();
Observable<Boolean> normalSaveTrigger = triggerSave.debounce( 30, TimeUnit.SECONDS );
Observable<Boolean> trigger = Observable.merge( normalSaveTrigger, triggerStopAndSave );

当正常的保存过程触发或我们正在停止保存时,trigger可观察的触发。

private void saveEvents(
  List<LocationGeoEvent> geo,
  List<LocationRSSIEvent> rss,
  boolean locationUpdatesAboutToStop) {
    synchronized (mEventsMonitor) {
        if (geo.size() > 0 || rss.size() > 0) {
             //do something with the data from buffered collection arrayLists and with the boolean locationUpdatesAboutToStop
        }
    }
}
private void scheduleStopLocationUpdates() {
  stopLocationUpdates();
  triggerStopAndSave.onNext( Boolean.FALSE );
  cleanAllReadingsData();
}
Observable.zip( mUpdateGeoEventsSubject.buffer( trigger ),
                mUpdateRSSIEventsSubject.buffer( trigger ),
                trigger, (geo, rss, trgr) -> saveEvents(geo, rss, trgr) )
  .subscribe();

您仍然需要在多线程和安全性方面进行一些调整。第一步是将各种主题转换为SerializedSubject,以便多个线程可以发出事件。

如果您希望saveEvents在特定调度程序上运行,则需要添加一个中间数据结构(三元组)以通过运算符传递参数observeOn()或者observeOn()运算符应用于每个参数zip()

最新更新