RX Observable -测量执行时间(即使嵌套)



我使用附加的类来测量可观察对象的执行时间。这工作得很好,只要我不测量嵌套的可观察对象…

工作正常

Observable<T> obs1 = ...;
Observable<T> obs2 = ...;
obs1
    .compose(RXMeasure.applyMeasurement(T.class, "tag1"))
    .subscribe();
obs2
    .compose(RXMeasure.applyMeasurement(T.class, "tag2"))
    .subscribe();

不工作

Observable<T> obs3 = ...;
Observable<T> obs = obs1
    .flatMap(resul1 -> obs2)
    .flatMap(result2 -> obs3)
    .subscribe();

这导致的结果是,所有的可观察对象在开始时都被订阅了,这意味着测量不再是正确的。这里我试图依次执行3个可观察对象…

我想要的

我想要一个测量函数

  • 没有断链
  • 正在测量一个可观察对象的执行时间(类似于可观察对象开始/结束之间的时间,而不是订阅/终止)

任何想法?

<

RXMeasure类/strong>

public class RXMeasure
{
    private static boolean mEnabled = true;
    public static void setEnabled(boolean enabled)
    {
        mEnabled = enabled;
    }
    public static <T> Observable.Transformer<T, T> applyMeasurement(Class clazz, String tag)
    {
        return observable -> measure(observable, clazz, tag);
    }
    public static <T> Observable<T> measure(Observable<T> observable, Class clazz, String tag)
    {
        if (!mEnabled)
            return observable;
        LongHolder start = new LongHolder(0);
        return observable
                .doOnSubscribe(() -> start.set(System.currentTimeMillis()))
                .doOnTerminate(() -> L.d(clazz, "[" + tag + "] Duration: " + String.valueOf(System.currentTimeMillis() - start.get()) + "ms"));
    }
}

嗯…如果您想要按顺序运行,并且它们没有任何数据依赖关系,那么这样做不行吗?

Observable.concatMap(
  obs1.compose(applyMeasurement(T.class, "tag1")),
  obs2.compose(applyMeasurement(T.class, "tag2")),
  obs3.compose(applyMeasurement(T.class, "tag3")),
).subscribe();

否则,你需要更好地定义术语"执行时间",特别是对于一个可能被多次订阅的可观察对象。

编辑:看起来像一个普通的concat是一个解决方案:

Observable<Observable<T>> obss = Observable.just(
  obs1.compose(applyMeasurement(T.class, "tag1")),
  obs2.compose(applyMeasurement(T.class, "tag2")),
  obs3.compose(applyMeasurement(T.class, "tag3")),
);
Observable.concat(obss).subscribe();

我从文档和RxJava源代码的理解是,concat将在之前的Observable完成/取消订阅后订阅。

这是我目前为止最好的解决方案:

我只是强制我所有的可观察对象顺序运行,不是通过链接,而是通过一个辅助函数…

RXUtil.executeSequentially(
            observable -> {
                // subscribe the way you want
                observable.subscribe();
            },
            true,
            obs1,
            obs2,
            obs3);
}
<

RXUtil类/strong>

public class RXUtil 
{
    public static Observable executeSequentially(ISubscribe subscribe, boolean startObserving, Observable... observables)
    {
        for (int i = 0; i < observables.length - 1; i++)
        {
            final int fI = i;
            Observable observable = observables[i]
                    .doOnCompleted(new Action0() {
                        @Override
                        public void call() {
                            subscribe.subscribe(observables[(fI + 1)]);
                        }
                    });
            observables[i] = observable;
        }
        if (startObserving)
            subscribe.subscribe(observables[0]);
        return observables[0];
    }
    public interface ISubscribe<T>
    {
        void subscribe(Observable<T> observable);
    }
}

最新更新