OutOfMemoryException 使用 retrofit2 和 rx java2 发送大量 POST 时



我有一个带有本地数据库(房间)的应用程序和一个服务,可以使用retrofit 2rxjavaPOSTs数据库中的所有"事件"。当我发送大量POSTs(即 1500+)时,应用程序会抛出OutOfMemoryException。我认为发生这种情况是因为每次客户端发送新的 POST 时它都会启动一个新线程。有没有办法防止retrofit/ rxJava创建这么多线程? 还是等待服务器响应更好?这是我的代码:

从本地数据库检索所有事件的类

public class RetreiveDbContent {
private final EventDatabase eventDatabase;
public RetreiveDbContent(EventDatabase eventDatabase) {
this.eventDatabase = eventDatabase;
}
@Override
public Maybe<List<Event>> eventsList() {
return eventDatabase.eventDao().getAllEvents()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
}
}

接下来,我有一个服务,它通过数据库事件列表迭代并发布所有事件。如果后端发回成功,则会从本地数据库中删除该事件。

private void sendDbContent() {
mRetreiveDbContent.eventsList()
.subscribe(new MaybeObserver<List<Event>>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onSuccess(final List<Event> events) {

Timber.e("Size of list from db " + events.size());
final CompositeDisposable disposable = new CompositeDisposable();
Observable<Event> eventObservable = Observable.fromIterable(events);
eventObservable.subscribe(new Observer<Event>() {
@Override
public void onSubscribe(Disposable d) {
disposable.add(d);
}
@Override
public void onNext(Event event) {
Timber.d("sending event from db " + event.getAction());
mPresenter.postEvent(Event);
}
@Override
public void onError(Throwable e) {
Timber.e("error while emitting db content " + e.getMessage());
}
@Override
public void onComplete() {
Timber.d("Finished looping through db list");
disposable.dispose();
}
});
}
@Override
public void onError(Throwable e) {
Timber.e("Error occurred while attempting to get db content " + e.getMessage());
}
@Override
public void onComplete() {
Timber.d("Finished getting the db content");
}
});
}

这是我postEvent()deleteEvent()方法,存在于演示者中

public void postEvent(final Event event) {
mSendtEvent.sendEvent(event)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new DisposableObserver<Response<ResponseBody>>() {
@Override
public void onNext(Response<ResponseBody> responseBodyResponse) {
switch (responseBodyResponse.code()) {
case CREATED_RESPONSE:
Timber.d("Event posted successfully " + responseBodyResponse.code());
deleteEventFromRoom(event);
break;
case BAD_REQUEST:
Timber.e("Client sent a bad request! We need to discard it!");
break;
}
}
@Override
public void onError(Throwable e) {
Timber.e("Error " + e.getMessage());
mView.onErrorOccurred();
}
@Override
public void onComplete() {
}
});
}

public void deleteEventFromRoom(final Event event) {
final CompositeDisposable disposable = new CompositeDisposable();
mRemoveEvent.removeEvent(event)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer() {
@Override
public void onSubscribe(Disposable d) {
disposable.add(d);
}
@Override
public void onNext(Object o) {
Timber.d("Successfully deleted event from database " + event.getAction());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
disposable.dispose();
}
});
}

最后mRemoveEvent交互器

public class RemoveEvent {
private final EventDatabase eventDatabase;
public RemoveEvent(EventDatabase eventDatabase) {
this.eventDatabase = eventDatabase;
}
@Override
public Observable removeEvent(final Event event) {
return Observable.fromCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
return eventDatabase.eventDao().delete(event);
}
});
}
}

注意:我是RXJava世界的新手。 提前谢谢你

您正在使用不支持背压的Observable

Fom RxJava github页面:

背压

当数据流通过异步步骤运行时,每个步骤都可能 以不同的速度执行不同的事情。避免压倒性 这样的步骤,通常表现为记忆增加 由于临时缓冲或需要跳过/丢弃而使用 数据,施加所谓的背压,这是一种流动形式 控制步骤可以表示它们准备好的项目数的位置 过程。这允许限制数据流的内存使用 通常无法让一个步骤知道多少个的情况 上游将向其发送的项目。

在 RxJava 中,专用的 Flowable类被指定为 支持 背压和可观察专用于非背压 操作(短序列、GUI 交互等)。其他类型, Single、May 和 Completable 不支持背压,也不应该 他们;总是有空间暂时存放一件物品。

你应该使用Flowable,你正在将所有事件发送到下游,以便使用所有可用的资源进行处理。

下面是一个简单的示例:

Flowable.range(1, 1000)
.buffer(10)//Optional you can process single event
.flatMap(buf -> {
System.out.println(String.format("100ms for sending events to server: %s ", buf));
Thread.sleep(100);
return Flowable.fromIterable(buf);
}, 1)// <-- How many concurrent task should be executed
.map(x -> x + 1)
.doOnNext(i -> System.out.println(String.format("doOnNext: %d", i)))
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single(), false, 1)//Overrides the 128 default buffer size
.subscribe(new DefaultSubscriber<Integer>() {
@Override
public void onStart() {
request(1);
}
@Override
public void onNext(Integer t) {
System.out.println(String.format("Received response from server for event : %d", t));
System.out.println("Processing value would take some time");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//You can request for more data here
request(1);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("ExampleUnitTest.onComplete");
}
});

最后一个提示:您不应该一次将整个事件提取到内存中,基本上您将所有"数据库事件"保存在内存中,考虑分页或类似Cursor,每个操作获取 100 行,并在处理它们后请求接下来的 100 行,我希望您使用 JobScheduler 或 WorkManager API 来执行此操作

您没有在此处添加任何错误日志,因此我不知道问题的确切原因,但是根据您的代码,您正在从本地数据库获取所有事件,并且 然后,从列表中迭代事件,并将每个事件发送到服务器,然后处理响应。 现在你说有 400-500 个条目,代码工作正常,而 ~1500 个事件它会崩溃,现在你需要明白,在这两种情况下,你的网络一次向服务器发送一个事件,所以问题可能在于你一次获取所有数据的方法。

因此,与其一次从 localdb 获取所有数据,不如一次获取一个事件,然后将其上传到服务器。

相关内容

  • 没有找到相关文章

最新更新