订阅者的 onComplete 方法是否由 Android 上的 Room 在后台线程上调用?



我正在使用Room实现Android应用程序。目前,我已经将 Guave 和 Jetpack libaries 包含在我项目中的其他部分。但是,我不介意包含另一个库(即 JxJava),如果这有助于解决我的问题。但是,我没有找到任何权威文档来回答我的问题。

我需要对数据进行异步、可观察的查询,并且观察处理程序必须在后台线程上运行,因为我必须对结果进行一些昂贵的后处理。

对于Java,有两个选项可以与房间进行异步交互:使用Rxjava或Guava/LiveData(请参阅编写异步DAO查询 - 语言和框架选项)

如果我想要一个异步的、一次性的(而不是可观察的)查询并采用 Guava/LiveData,那么 API 会返回一个ListenableFuture(请参阅编写异步 DAO 查询 - 编写异步一次性查询)。ListenableFutureRunnable作为侦听器,该侦听器与用于在数据更改时调度侦听器的Executator相关联(请参阅 ListenableFuture#addListener)。这很棒,因为我的应用程序中已经有一个用于后台任务的中央ThreadPool执行器。但是,这是我不想要的一次性查询。

与 Guava/LiveData 结合使用的异步可观察查询将返回一个LiveData(请参阅编写异步 DAO 查询 - 编写异步可观察查询。这很遗憾,因为LiveDataObserveronChange方法总是在主(GUI)线程上执行。这是LiveData的设计原则,因为它们应该更新UI,但这不是我需要的。当然,我可以在主线程上使用onChange方法,在我的后台执行器上调度一个Runnable并再次跳转到后台,但这似乎涉及后台和主踏板之间不必要的上下文切换。

所以我考虑使用RxJava。与 RxJava 结合使用的异步、可观察查询将返回一个Flowable。可以使用Consumer订阅Flowable。但是,当Flowable发出新值时,我没有找到有关Consumeraccept方法调度哪个线程的任何信息。根据RxJava文档,这是Flowable创建者的责任,因为RxJava只定义了抽象接口,但没有规定特定的实现。在手头的情况下,Flowable的创建者是 房间库,但它似乎没有记录,哪个线程由 房间 用于Flowable.

房间是否使用主线程来更新其Flowable?(那会很糟糕,并且没有比LiveData改进)。Room 是否使用与数据库查询相同的后台线程来更新其Flowable?(这不会完全坏,但仍然可以改进。还是 Room 会分叉一个仅用于更新其Flowable的新线程?(好)

奖励:为了保持较少的分叉和销毁线程的数量,如果 Room 不仅可以将我的应用程序范围的ThreadPoolExecutor用于Flowables,还可以用于其查询和所有其他异步任务,我将不胜感激。

你在文档中找不到的东西,总是可以在源代码中找到:D

假设这样的查询:

@Query("SELECT * FROM table")
fun query(): Flowable<List<Entity>>

Room 的注释处理器将生成一些代码,其中包含以下内容:

return RxRoom.createFlowable(__db, true, new String[]{"table"}, new Callable<List<Entity>>() {... /*not important here*/}

检查此方法的实现:

Scheduler scheduler = Schedulers.from(getExecutor(database, inTransaction));
final Maybe<T> maybe = Maybe.fromCallable(callable);
return createFlowable(database, tableNames)
.subscribeOn(scheduler)
.unsubscribeOn(scheduler)
.observeOn(scheduler)
.flatMapMaybe(new Function<Object, MaybeSource<T>>() {
@Override
public MaybeSource<T> apply(Object o) throws Exception {
return maybe;
}
});

因此,他们使用从执行器创建的调度程序。 您可以在此处找到有关此遗嘱执行器的一些信息androidx.room.RoomDatabase.Builder#setQueryExecutor

* When both the query executor and transaction executor are unset, then a default
* {@code Executor} will be used. The default {@code Executor} allocates and shares threads
* amongst Architecture Components libraries. If the query executor is unset but a
* transaction executor was set, then the same {@code Executor} will be used for queries.
* <p>
* For best performance the given {@code Executor} should be bounded (max number of threads
* is limited).
<小时 />

示例

以下代码

dao.query().subscribe({
Log.d("CheckThread1", "onSuccess ${Thread.currentThread().name}")
}, {
Log.d("CheckThread1", "onError  ${Thread.currentThread().name}")
})

将导致:

D/CheckThread1: onSuccess arch_disk_io_0

如您所见,此操作不在主线程上处理。

是的,您可以在构建数据库时设置自己的执行器:

Room.databaseBuilder(
context.applicationContext,
Database::class.java,
"main.db"
)
.setTransactionExecutor(.../*your executor*/)
.setQueryExecutor(.../*your executor*/)
.build()

最新更新