在后台线程上运行RxWorker



我需要运行一个Observable,它在后台线程上进行大量处理,这样用户就不会觉得应用程序冻结了。

我已经尝试过覆盖getBackgroundScheduler(),但应用程序一直处于冻结状态。

@Override
protected Scheduler getBackgroundScheduler() {
return Schedulers.computation();
}

并尝试使用.subscribeOn(Schedulers.computation())进行订阅,但一无所获。

我正在将WorkManager与RxJava一起使用。

这是我的工人阶级:

public class Worker extends RxWorker {
public Worker(@NonNull Context appContext, @NonNull WorkerParameters workerParams) {
super(appContext, workerParams);
}
@NonNull
@Override
public Single<Result> createWork() {
return Observables.getCounter()
.map(currentValue -> Result.success()) // map is running on a background thread.
.lastOrError();
}
}

这是繁重的工作:

public class Observables {
public static Observable<Integer> getCounter() {
return Observable.just(1, 2, 3); // This is running on the main thread.
}
}

只需使用single.create((创建您的单曲。这将使您的繁重工作在订阅线程上运行。最好重写onStopped((方法来处理一次性的。

你重写的课程:

public class Worker extends RxWorker {
private final CompositeDisposable compositeDisposable;
public Worker(@NonNull Context appContext, @NonNull WorkerParameters workerParams) {
super(appContext, workerParams);
compositeDisposable = new CompositeDisposable();
}
@NonNull
@Override
public Single<Result> createWork() {
return Single.create(emitter -> {
compositeDisposable.add(Observables.getCounter().subscribe(
next -> emitter.onSuccess(Result.success()),
throwable -> emitter.onSuccess(Result.failure())
));
});
}
@Override
public void onStopped() {
super.onStopped();
compositeDisposable.dispose();
}
}

最新更新