我使用附加的类来测量可观察对象的执行时间。这工作得很好,只要我不测量嵌套的可观察对象…
工作正常
Observable<T> obs1 = ...;
Observable<T> obs2 = ...;
obs1
.compose(RXMeasure.applyMeasurement(T.class, "tag1"))
.subscribe();
obs2
.compose(RXMeasure.applyMeasurement(T.class, "tag2"))
.subscribe();
不工作
Observable<T> obs3 = ...;
Observable<T> obs = obs1
.flatMap(resul1 -> obs2)
.flatMap(result2 -> obs3)
.subscribe();
这导致的结果是,所有的可观察对象在开始时都被订阅了,这意味着测量不再是正确的。这里我试图依次执行3个可观察对象…
我想要的
我想要一个测量函数
- 没有断链
- 正在测量一个可观察对象的执行时间(类似于可观察对象开始/结束之间的时间,而不是订阅/终止)
任何想法?
<RXMeasure类/strong>
public class RXMeasure
{
private static boolean mEnabled = true;
public static void setEnabled(boolean enabled)
{
mEnabled = enabled;
}
public static <T> Observable.Transformer<T, T> applyMeasurement(Class clazz, String tag)
{
return observable -> measure(observable, clazz, tag);
}
public static <T> Observable<T> measure(Observable<T> observable, Class clazz, String tag)
{
if (!mEnabled)
return observable;
LongHolder start = new LongHolder(0);
return observable
.doOnSubscribe(() -> start.set(System.currentTimeMillis()))
.doOnTerminate(() -> L.d(clazz, "[" + tag + "] Duration: " + String.valueOf(System.currentTimeMillis() - start.get()) + "ms"));
}
}
嗯…如果您想要按顺序运行,并且它们没有任何数据依赖关系,那么这样做不行吗?
Observable.concatMap(
obs1.compose(applyMeasurement(T.class, "tag1")),
obs2.compose(applyMeasurement(T.class, "tag2")),
obs3.compose(applyMeasurement(T.class, "tag3")),
).subscribe();
否则,你需要更好地定义术语"执行时间",特别是对于一个可能被多次订阅的可观察对象。
编辑:看起来像一个普通的concat
是一个解决方案:
Observable<Observable<T>> obss = Observable.just(
obs1.compose(applyMeasurement(T.class, "tag1")),
obs2.compose(applyMeasurement(T.class, "tag2")),
obs3.compose(applyMeasurement(T.class, "tag3")),
);
Observable.concat(obss).subscribe();
我从文档和RxJava源代码的理解是,concat
将在之前的Observable完成/取消订阅后订阅。
这是我目前为止最好的解决方案:
我只是强制我所有的可观察对象顺序运行,不是通过链接,而是通过一个辅助函数…
RXUtil.executeSequentially(
observable -> {
// subscribe the way you want
observable.subscribe();
},
true,
obs1,
obs2,
obs3);
}
<RXUtil类/strong>
public class RXUtil
{
public static Observable executeSequentially(ISubscribe subscribe, boolean startObserving, Observable... observables)
{
for (int i = 0; i < observables.length - 1; i++)
{
final int fI = i;
Observable observable = observables[i]
.doOnCompleted(new Action0() {
@Override
public void call() {
subscribe.subscribe(observables[(fI + 1)]);
}
});
observables[i] = observable;
}
if (startObserving)
subscribe.subscribe(observables[0]);
return observables[0];
}
public interface ISubscribe<T>
{
void subscribe(Observable<T> observable);
}
}