RxJava回压和调用生产者的次数



我试图在我的Android应用程序中使用rx Java中的背压创建无限滚动。我希望它只调用外部服务请求次数(在调用request(1)之后)。但在使用平面地图后,每个subscribe加载16页。

下面的代码与预期的结果。几乎每个测试失败都是因为第一个请求(n=16)

import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import rx.Observable;
import rx.observers.TestSubscriber;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import static java.util.Collections.emptyList;
import static org.mockito.Mockito.*;
import static rx.internal.util.UtilityFunctions.identity;
public class ServiceObservablesTest {

    public static <T> Observable<List<T>> create(DataProvider<T> dataProvider) {
        Observable<Observable<List<T>>> metaObservalble = Observable.create(subscriber -> {
            AtomicInteger pageNumber = new AtomicInteger();
            subscriber.setProducer(n -> {
                // at subscribe rxJava makes request for 16 elements - probably because of flatMap
                // after first request with 16 elements everything seems to work fine even if i ignore the 'n' param
                Observable<List<T>> page = dataProvider.requestPage(pageNumber.getAndIncrement());
                subscriber.onNext(page);
            });
        });
        return metaObservalble.flatMap(identity()).takeWhile(page -> !page.isEmpty());
    }
    public interface DataProvider<T> {
        Observable<List<T>> requestPage(int page);
    }

    private DataProvider provider;
    @Before
    public void setUp() throws Exception {
        provider = Mockito.mock(DataProvider.class);
        List<Object> list = Arrays.asList(new Object());
        when(provider.requestPage(anyInt())).thenReturn(Observable.just(list));
    }
    @Test
    public void shouldRequestOnlyFirstPageOnSubscribe() {
        //given
        TestSubscriber<List<Object>> subscriber = new TestSubscriber<>(1);
        Observable<List<Object>> flightsObservable = create(provider);
        //when
        flightsObservable.subscribe(subscriber);
        //then
        subscriber.assertValueCount(1);
        subscriber.assertNotCompleted();
        verify(provider, times(1)).requestPage(0);
        verify(provider, never()).requestPage(1);
    }

    @Test
    public void shouldRequestNumberOfPagesSpecified() {
        //given
        int requested_pages = 5;
        TestSubscriber<List<Object>> subscriber = new TestSubscriber<>(0);
        Observable<List<Object>> flightsObservable = create(provider);
        //when
        flightsObservable.subscribe(subscriber);
        subscriber.requestMore(requested_pages);
        //then
        subscriber.assertValueCount(requested_pages);
        subscriber.assertNotCompleted();

        for (int i = 0; i < requested_pages; i++) {
            verify(provider, times(1)).requestPage(i);
        }
        verify(provider, never()).requestPage(requested_pages);
    }

    @Test
    public void shouldCompleteAfterRetrievingEmptyResult() {
        //given
        int emptyPage = 2;
        when(provider.requestPage(emptyPage)).thenReturn(Observable.just(emptyList()));
        TestSubscriber<List<Object>> subscriber = new TestSubscriber<>(100);
        Observable<List<Object>> flightsObservable = create(provider);

        //when
        flightsObservable.subscribe(subscriber);
        //then
        subscriber.assertValueCount(emptyPage);
        subscriber.assertCompleted();

        verify(provider, times(1)).requestPage(0); //requested at subscribe
        for (int i = 1; i <= emptyPage; i++) {
            verify(provider, times(1)).requestPage(i);
        }
        verify(provider, never()).requestPage(emptyPage + 1);
    }
    @Test
    public void shouldRequestNextPageWhenRequestedMore() {
        //given
        TestSubscriber<List<Object>> subscriber = new TestSubscriber<>(1);
        Observable<List<Object>> flightsObservable = create(provider);
        //when
        flightsObservable.subscribe(subscriber);
        subscriber.requestMore(1);
        //then
        subscriber.assertValueCount(2);
        verify(provider, times(1)).requestPage(0);
        verify(provider, times(1)).requestPage(1);
        verify(provider, never()).requestPage(2);
        //when
        subscriber.requestMore(1);
        //then
        subscriber.assertValueCount(3);
        subscriber.assertNotCompleted();
        verify(provider, times(1)).requestPage(0);
        verify(provider, times(1)).requestPage(1);
        verify(provider, times(1)).requestPage(2);
        verify(provider, never()).requestPage(3);
    }
    @Test
    public void shouldWorkWithMultipleSubscribers() {
        //given
        TestSubscriber<List<Object>> subscriber1 = new TestSubscriber<>(1);
        TestSubscriber<List<Object>> subscriber2 = new TestSubscriber<>(1);
        Observable<List<Object>> flightsObservable = create(provider);
        //when
        flightsObservable.subscribe(subscriber1);
        flightsObservable.subscribe(subscriber2);
        //then
        subscriber1.assertValueCount(1);
        subscriber2.assertValueCount(1);
        verify(provider, times(2)).requestPage(0);
        verify(provider, never()).requestPage(1);
        //when
        subscriber1.requestMore(1);
        //then
        subscriber1.assertValueCount(2);
        subscriber2.assertValueCount(1);
        verify(provider, times(2)).requestPage(0);
        verify(provider, times(1)).requestPage(1);
        verify(provider, never()).requestPage(2);
        //when
        subscriber2.requestMore(1);
        //then
        subscriber1.assertValueCount(2);
        subscriber2.assertValueCount(2);
        verify(provider, times(2)).requestPage(0);
        verify(provider, times(2)).requestPage(1);
        verify(provider, never()).requestPage(2);
    }
}

反向压力旨在协商并发的消费者生产者行为,并允许程序作者设置策略来解决当产生的数据速率超过使用的数据速率时该怎么办。

也就是说,您将看到组合可观察值(如merge)的操作符将为您提供与所需数据量不对应的请求量。外部可观察对象(可观察对象中的可观察对象)在合并时总是会在RxAndroid上接收到16 (RxJava中是128)的请求。然后,当它接收List的内部可观察对象时,每个内部可观察对象将收到一个基于下游订阅者请求数量的请求。如果您试图编写一个Observable<Observable<T>>,您将被迫编写一个OnSubscribe<Observable<List<T>>>函数,该函数在内部管理合并行为,因此它是Observable<List<T>>而不是Observable<Observable<List<T>>。这样做会迫使你订阅数据提供程序返回的可观察对象来unwrap和onNext List<T>

我建议您将屏幕y位置映射到页面结束事件,然后使用扫描将其转换为单调递增的数字,然后将该数字concatMap转换为对DataProvider.requestPage()的调用。

screenYPositions
    .map(this::isUninitializedOrNearEndOfPage)
    .scan(1, (event, pageNumber) -> pageNumber + 1 )
    .concatMap(dataProvider::requestPage)
    .subscribe(testSubscriber);

相关内容

  • 没有找到相关文章

最新更新