RxBleConnection Observable is not emitting



在我的android存储库层中,我持有对android RxBleConnection的引用。我还有一个读写函数,它返回一个可观察的,我的问题是这个函数正在被调用,但它没有响应,因为可观察的连接已经停止发出,你可以检查下面的代码,有什么建议吗?

class LampRepository(private val lampBleDevice: Observable<RxBleDevice>,
private val bluetoothExecutor: Executor) : IRepository {
companion object {
private val TAG = "LampRepository"
}
val  lampConnectionState:Observable<RxBleConnection.RxBleConnectionState>
val  lampBleConnection:Observable<RxBleConnection>
init {
lampBleConnection = lampBleDevice.subscribeOn(Schedulers.from(bluetoothExecutor))
.observeOn(AndroidSchedulers.mainThread())
.flatMap {
Log.d(TAG,"Connecting to the lamp GATT server")
it.establishConnection(true) }
.compose(ReplayingShare.instance())

lampConnectionState =  lampBleDevice.subscribeOn(Schedulers.from(bluetoothExecutor))
.observeOn(AndroidSchedulers.mainThread())
.flatMap {
Log.d(TAG,"Observing the Lamp GATT server connection state")
it.observeConnectionStateChanges()}
.share()
}

fun getLampLuminosityLevel()
= lampBleConnection.subscribeOn(Schedulers.from(bluetoothExecutor))
.observeOn(AndroidSchedulers.mainThread())
.flatMap {
Log.d(TAG,"Reading the lamp luminosity characteristic")
it.readCharacteristic(UUID.fromString(LampProfile.STATE_CHARACTERISTIC_UUID))
?.toObservable()}
.flatMap {
Observable.just(it[0].toInt()) }
.flatMap { Observable.just(0) }
.flatMap {
when (it) {
0 -> Observable.just(LampProfile.Luminosity.NON)
1 -> Observable.just(LampProfile.Luminosity.LOW)
2 -> Observable.just(LampProfile.Luminosity.MEDIUM)
3 -> Observable.just(LampProfile.Luminosity.HIGH)
4 -> Observable.just(LampProfile.Luminosity.MAX)
else -> Observable.error(Throwable("unknown value ${it}"))
}}
fun getLampPowerState()
= lampBleConnection.subscribeOn(Schedulers.from(bluetoothExecutor))
.observeOn(AndroidSchedulers.mainThread())
.flatMap {
Log.d(TAG,"Reading the lamp power state characteristic")
it.readCharacteristic(UUID.fromString(LampProfile.STATE_CHARACTERISTIC_UUID))
.toObservable()}
.flatMap {
Observable.just(it[0].toInt()) }
.flatMap {
when (it) {
0 -> Observable.just(LampProfile.State.OFF)
1 ->  Observable.just(LampProfile.State.ON)
else ->  Observable.error(Throwable("unknown value ${it}"))
}}

fun setLampPowerState(state: LampProfile.State)
= lampBleConnection.subscribeOn(Schedulers.from(bluetoothExecutor))
.observeOn(AndroidSchedulers.mainThread())
.flatMap {
Log.d(TAG,"writing to the Characteristic")
it.writeCharacteristic(UUID.fromString(LampProfile.STATE_CHARACTERISTIC_UUID), byteArrayOf(state.value.toByte()))
.toObservable()}
.flatMap {
Observable.just(it[0].toInt()) }
.flatMap {  Observable.just(1) }
.flatMap {
Log.d(TAG,"Finish writing")
when (it) {
0 -> Observable.just(LampProfile.State.OFF)
1 ->  Observable.just(LampProfile.State.ON)
else -> Observable.error(Throwable("unknown value")) }}

fun setLampLuminosityLevel(level: LampProfile.Luminosity)
=lampBleConnection.subscribeOn(Schedulers.from(bluetoothExecutor))
.flatMap {
Log.d(TAG,"Writing the lamp luminosity characteristic")
it.writeCharacteristic(UUID.fromString(LampProfile.LUMINOSITY_CHARACTERISTIC_UUID), byteArrayOf(level.value.toByte()))
.toObservable()
}
.flatMap {
Observable.just(it[0].toInt())
}
.flatMap { Observable.just(0) }
.flatMap {
when (it) {
0 -> Observable.just(LampProfile.Luminosity.NON)
1 -> Observable.just(LampProfile.Luminosity.LOW)
2 -> Observable.just(LampProfile.Luminosity.MEDIUM)
3 -> Observable.just(LampProfile.Luminosity.HIGH)
4 -> Observable.just(LampProfile.Luminosity.MAX)
else -> Observable.error(Throwable("unknown value"))
}
}

}
class LampViewModel(private val lampRepository:LampRepository):ViewModel(){
companion object {
val TAG = "LampViewModel"
}
private val  mLampPowerState:MutableLiveData<LampProfile.State> = MutableLiveData()
private val  mLampLuminosityLevel:MutableLiveData<LampProfile.Luminosity>  = MutableLiveData()
private val  mLampBleConnectionState:MutableLiveData<RxBleConnection.RxBleConnectionState>  = MutableLiveData()
private val  compositeDisposable = CompositeDisposable()
init {
compositeDisposable.add(lampRepository.lampConnectionState.subscribe(
mLampBleConnectionState::postValue,{
Log.d(TAG,"error is ${it.message}")
}))
compositeDisposable.add(lampRepository.lampBleConnection.subscribe({
},{
}))

}

fun getLampLuminosityLevel():LiveData<LampProfile.Luminosity> = mLampLuminosityLevel
fun getLampPowerState():LiveData<LampProfile.State> = mLampPowerState
fun getLampConnectionState():LiveData<RxBleConnection.RxBleConnectionState> = mLampBleConnectionState

fun setLampPowerState(state: LampProfile.State) {
Log.d(TAG,"Into the function")
compositeDisposable.add(lampRepository.setLampPowerState(state)
.subscribe({
Log.d(TAG,"writing with success $it")
},{
Log.d(TAG,"error while writing ${it.message}")
}))
}
fun setLampLuminosityLevel(level: LampProfile.Luminosity) {
compositeDisposable.add(lampRepository.setLampLuminosityLevel(level)
.subscribe({
},{
Log.d(TAG,"writing error")
}))
}
class DefaultServiceLocator (private val activity: FragmentActivity): ServiceLocator {
companion object {
private  val TAG = "DefaultServiceLocator"
}

private var blueToothClient = RxBleClient.create(activity)
private var rxPermissions = RxPermissions(activity)
private val BLUETOOTH_IO = Executors.newFixedThreadPool(2)
private val NETWORK_IO = Executors.newFixedThreadPool(1)
private val bluetoothScan:Observable<ScanResult>
private val bluetoothClientState:Observable<RxBleClient.State>
private val lampBleDevice: Observable<RxBleDevice>
private val broadLinkBleDevice: Observable<RxBleDevice>
private val broadLinkRepository:BroadLinkRepository
private val mConfig = AIConfiguration(
"e87e26ceb2ae4519ace2f3c71abd076e",
ai.api.AIConfiguration.SupportedLanguages.English,
AIConfiguration.RecognitionEngine.System
)
private val AIDataService = AIDataService(mConfig)
init {


bluetoothClientState =  blueToothClient.observeStateChanges()
.observeOn(Schedulers.from(getBlueToothExecutor()))
.subscribeOn(AndroidSchedulers.mainThread())
.startWith(Observable.just(blueToothClient.state))
.share()

bluetoothScan =  bluetoothClientState.filter{it == RxBleClient.State.READY}
.delay(1000,TimeUnit.MILLISECONDS)
.flatMap {
blueToothClient.scanBleDevices(
ScanSettings.Builder()
.setScanMode(ScanSettings.SCAN_MODE_LOW_LATENCY)
.setCallbackType(ScanSettings.CALLBACK_TYPE_FIRST_MATCH)
.build(),
ScanFilter.Builder().build())}
.share()

broadLinkBleDevice = bluetoothScan.filter { it.bleDevice.macAddress ==  BroadLinkProfile.DEVICE_MAC_ADDRESS}
.flatMap { result ->
Log.d(TAG, "the device named ${result.bleDevice.name} is found")
Observable.just(result.bleDevice) }
.retry()
.share()

lampBleDevice =   bluetoothScan.filter { it.bleDevice.macAddress == LampProfile.DEVICE_MAC_ADDRESS }
.flatMap { result ->
Log.d(TAG, "the device named ${result.bleDevice.name} is found")
Observable.just(result.bleDevice) }
.retry()
.share()

broadLinkRepository = BroadLinkRepository(broadLinkBleDevice,getBlueToothExecutor())

}

override fun getBlueToothExecutor() = BLUETOOTH_IO
override fun getNetworkExecutor() = NETWORK_IO
override fun getBleDevice(key: BleDevices): Observable<RxBleDevice> {
return when (key) {
BleDevices.LAMP -> lampBleDevice
BleDevices.BROAD_LINK -> broadLinkBleDevice
}
}
override fun getRepository(key: Repositories): IRepository {
return when (key) {
Repositories.LAMP_REPOSITORY -> LampRepository(
lampBleDevice = getBleDevice(BleDevices.LAMP),
bluetoothExecutor = getBlueToothExecutor()
)
Repositories.TV_REPOSITORY -> TvRepository(
broadLinkRepository = broadLinkRepository,
bluetoothExecutor = getBlueToothExecutor()
)
Repositories.AIR_CONDITIONER_REPOSITORY -> AirConditionerRepository(
broadLinkRepository = broadLinkRepository,
bluetoothExecutor = getBlueToothExecutor()
)
Repositories.DIALOG_REPOSITORY -> DialogRepository(
AIService = AIDataService,
networkExecutor = getNetworkExecutor()
)
}
}

}

由于同时使用ReplayingShare.share()lampBleConnection每次连接只会发出一次。

ReplayingShare共享上游Observable订阅,并向新订户发出最后一个事件。使用.share()做了部分相同的事情,因为它还共享上游Observable订阅,使得ReplayingShare无用,因为它将只被订阅一次。

底线--从lampBleConnectionsetLampPowerState()中删除.share()

下面删除.share()的(原始(代码应该可以做到这一点:

class LampRepository(private val lampBleDevice: Observable<RxBleDevice>,
private val bluetoothExecutor: Executor) : IRepository {
companion object {
private val TAG = "LampRepository"
}
val  lampConnectionState:Observable<RxBleConnection.RxBleConnectionState>
val  lampBleConnection:Observable<RxBleConnection>
init {
lampBleConnection = lampBleDevice.subscribeOn(Schedulers.from(bluetoothExecutor))
.observeOn(AndroidSchedulers.mainThread())
.flatMap {
Log.d(TAG,"Connecting to the lamp GATT server")
it.establishConnection(true) }
.compose(ReplayingShare.instance())

lampConnectionState =  lampBleDevice.subscribeOn(Schedulers.from(bluetoothExecutor))
.observeOn(AndroidSchedulers.mainThread())
.flatMap {
Log.d(TAG,"Observing the Lamp GATT server connection state")
it.observeConnectionStateChanges()}
.share()
}

fun getLampLuminosityLevel() = lampBleConnection.subscribeOn(Schedulers.from(bluetoothExecutor))
.observeOn(AndroidSchedulers.mainThread())
.flatMap {
Log.d(TAG,"Reading the lamp luminosity characteristic")
it.readCharacteristic(UUID.fromString(LampProfile.STATE_CHARACTERISTIC_UUID))
?.toObservable()
}
.flatMap { Observable.just(it[0].toInt()) }
.flatMap { Observable.just(0) }
.flatMap {
when (it) {
0 -> Observable.just(LampProfile.Luminosity.NON)
1 -> Observable.just(LampProfile.Luminosity.LOW)
2 -> Observable.just(LampProfile.Luminosity.MEDIUM)
3 -> Observable.just(LampProfile.Luminosity.HIGH)
4 -> Observable.just(LampProfile.Luminosity.MAX)
else -> Observable.error(Throwable("unknown value ${it}"))
}
}
fun getLampPowerState() = lampBleConnection.subscribeOn(Schedulers.from(bluetoothExecutor))
.observeOn(AndroidSchedulers.mainThread())
.flatMap {
Log.d(TAG,"Reading the lamp power state characteristic")
it.readCharacteristic(UUID.fromString(LampProfile.STATE_CHARACTERISTIC_UUID))
.toObservable()
}
.flatMap { Observable.just(it[0].toInt()) }
.flatMap { Observable.just(0) }
.flatMap {
when (it) {
0 -> Observable.just(LampProfile.State.OFF)
1 ->  Observable.just(LampProfile.State.ON)
else ->  Observable.error(Throwable("unknown value ${it}"))
}
}
fun setLampPowerState(state: LampProfile.State) = lampBleConnection.subscribeOn(Schedulers.from(bluetoothExecutor))
.observeOn(AndroidSchedulers.mainThread())
.flatMap {
Log.d(TAG,"writing to the Characteristic")
it.writeCharacteristic(UUID.fromString(LampProfile.STATE_CHARACTERISTIC_UUID), byteArrayOf(state.value.toByte()))
?.toObservable()
}
.flatMap { Observable.just(it[0].toInt()) }
.flatMap {
Log.d(TAG,"Finish writing")
when (it) {
0 -> Observable.just(LampProfile.State.OFF)
1 ->  Observable.just(LampProfile.State.ON)
else -> Observable.error(Throwable("unknown value")) 
}
}

fun setLampLuminosityLevel(level: LampProfile.Luminosity) = lampBleConnection.subscribeOn(Schedulers.from(bluetoothExecutor))
.flatMap {
Log.d(TAG,"Writing the lamp luminosity characteristic")
it.writeCharacteristic(UUID.fromString(LampProfile.LUMINOSITY_CHARACTERISTIC_UUID), byteArrayOf(level.value.toByte()))
.toObservable()
}
.flatMap { Observable.just(it[0].toInt()) }
.flatMap { Observable.just(0) }
.flatMap {
when (it) {
0 -> Observable.just(LampProfile.Luminosity.NON)
1 -> Observable.just(LampProfile.Luminosity.LOW)
2 -> Observable.just(LampProfile.Luminosity.MEDIUM)
3 -> Observable.just(LampProfile.Luminosity.HIGH)
4 -> Observable.just(LampProfile.Luminosity.MAX)
else -> Observable.error(Throwable("unknown value"))
}
}
}

最新更新