使用一个可观察对象作为时钟来测试另一个对象的超时



我想做什么的声明如下所示:

// Checks input source for timeouts, based on the number of elements received 
// from clock since the last one received from source. 
// The two selectors are used to generate output elements.
public static IObservable<R> TimeoutDetector<T1,T2,R>(
        this IObservable<T1> source, 
        IObservable<T2> clock, 
        int countForTimeout,
        Func<R> timedOutSelector, 
        Func<T1, R> okSelector)

弹珠图在ascii中很困难,但这里是:

source --o---o--o-o----o-------------------o---
clock  ----x---x---x---x---x---x---x---x---x---
output --^---^--^-^----^-----------!-------^---

我尝试寻找现有的Observable函数,这些函数可以以我可以使用的方式组合sourceclock,但大多数组合函数依赖于接收"每个中的一个"(AndZip),或者它们从"缺失的"值(CombineLatest)中重新返回"上一个"值,或者它们离我需要的太远了(AmbGroupJoinJoinMergeSelectManyTimeout)。 Sample看起来很接近,但我不想将源吞吐量限制为时钟速率。

所以现在我试图填补这里的大空白:

return new AnonymousObservable<R>(observer =>
{
    //One observer, two observables??
});

抱歉,这里的"你尝试过什么"部分有点弱:假设我已经尝试过思考它!我不要求完全实现,只是:

  • 是否有内置功能可以帮助我错过?
  • 如何构建订阅两个可观察量的基于 lambda 的观察器?
我知道

你没有要求完整的实现,但我认为这是一个解决方案:

public static IObservable<TR> TimeoutDetector<T1, T2, TR>(
    this IObservable<T1> source,
    IObservable<T2> clock,
    int countForTimeout,
    Func<TR> timedOutSelector,
    Func<T1, TR> okSelector)
{
    return source
        .Select(i => clock.Take(countForTimeout).LastAsync())
        .Switch().Select(_ => timedOutSelector())
        .Merge(source.Select(okSelector));
}

它的工作原理如下 - 我注意到您的输出是由 okSelector 投影的源,与超时事件合并。因此,我专注于生成超时事件,因为其余的都很容易。

这个想法是每次源发射时创建一个倒计时,并在每个时钟脉冲上减少这个倒计时。如果源发出,我们将中止倒计时,否则当倒计时达到 0 时,我们将生成 timedOut 事件。

分解一下:

  1. 将每个源元素投影到一个接受前countForTimeout元素的流中 - 请注意,时钟流必须是"热"可观察的,因为我们在每个倒计时事件中订阅它。时钟流很热是很正常的。如果这发生事件,我们将超时。
  2. Switch将抛弃除最新倒计时流之外的所有内容。
  3. 使用Select投影到超时事件。
  4. 现在只需合并源事件。

这是我使用的单元测试,旨在与您的大理石图非常相似(nuget rx-testing和nunit用于编译所需的库):

    [Test]
    public void AKindOfTimeoutTest()
    {
        var scheduler = new TestScheduler();
        var clockStream = scheduler.CreateHotObservable(
            OnNext(100, Unit.Default),
            OnNext(200, Unit.Default),
            OnNext(300, Unit.Default),
            OnNext(400, Unit.Default),
            OnNext(500, Unit.Default),
            OnNext(600, Unit.Default),
            OnNext(750, Unit.Default), /* make clock funky! */
            OnNext(800, Unit.Default),
            OnNext(900, Unit.Default));

        var sourceStream = scheduler.CreateColdObservable(
            OnNext(50, 1),
            OnNext(150, 2),
            OnNext(250, 3),
            OnNext(275, 4),
            OnNext(400, 5),
            OnNext(900, 6));

        Func<int> timedOutSelector = () => 0;
        Func<int, int> okSelector = i => i;
        var results = scheduler.CreateObserver<int>();
        sourceStream.TimeoutDetector(clockStream, 3, timedOutSelector, okSelector)
                    .Subscribe(results);
        scheduler.Start();
        results.Messages.AssertEqual(
            OnNext(50, 1),
            OnNext(150, 2),
            OnNext(250, 3),
            OnNext(275, 4),
            OnNext(400, 5),
            OnNext(750, 0),
            OnNext(900, 6));
    }
}

要尝试回答您的具体问题,请执行以下操作:

  • 问。是否有内置功能可以帮助我错过?答:扫描可能是关键。
  • 问。如何构建订阅两个可观察量的基于 lambda 的观察器?一个。不太确定你的意思...有很多方法可以组合流,您提到了其中的大多数。

这是我提到的 Observable.Create 方法(相同的测试有效):

public static IObservable<TR> TimeoutDetector<T1, T2, TR>(
    this IObservable<T1> source,
    IObservable<T2> clock,
    int countForTimeout,
    Func<TR> timedOutSelector,
    Func<T1, TR> okSelector)
{
    return Observable.Create<TR>(observer =>
        {
            var counter = countForTimeout;
            var timeoutSub = clock.Subscribe(_ =>
                {
                    var count = Interlocked.Decrement(ref counter);
                    if (count == 0)
                    {
                        observer.OnNext(timedOutSelector());
                    }
                },
                observer.OnError,
                observer.OnCompleted);
            var sourceSub = source.Subscribe(
                i =>
                {
                    Interlocked.Exchange(ref counter, countForTimeout);
                    observer.OnNext(okSelector(i));
                },
                observer.OnError,
                observer.OnCompleted);
            return new CompositeDisposable(sourceSub, timeoutSub);
        });
}

请注意,Observable.Create 对于确保使用正确的 Rx 语法非常有帮助(即流发出 OnNext* (OnError |已完成)?- 这意味着我可以稍微放松一次发送OnError或OnComplete。

我想

出了这个,这比詹姆斯的答案更不漂亮。

public static IObservable<R> TimeoutDetector2<T1, T2, R>(
        this IObservable<T1> source, 
        IObservable<T2> clock, int maxDiff,
        Func<R> timedOutSelector, Func<T1, R> okSelector)
{
    return new AnonymousObservable<R>(observer =>
    {
        int counter = 0;
        object gate = new object();
        bool error = false;
        bool completed = false;
        bool timedOut = false;
        var sourceSubscription = source.Subscribe(
            x =>
            {
                lock(gate)
                {
                    if(!error && !completed) observer.OnNext(okSelector(x));
                    counter = 0;
                    timedOut = false;
                }
            },
            ex =>
            {
                lock(gate)
                {
                    error = true;
                    if(!completed) observer.OnError(ex);
                }
            },
            () =>
            {
                lock(gate)
                {
                    completed = true;
                    if(!error) observer.OnCompleted();
                }
            });
        var clockSubscription = clock.Subscribe(
            x =>
            {
                lock(gate)
                {
                    counter = counter + 1;
                    if(!error && !completed && counter > maxDiff && !timedOut)
                    {
                        timedOut = true;
                        observer.OnNext(timedOutSelector());
                    }
                }
            },
            ex =>
            {
                lock(gate)
                {
                    error = true;
                    if(!completed) observer.OnError(ex);
                }
            },
            () =>
            {
                lock(gate)
                {
                    completed = true;
                    if(!error) observer.OnCompleted();
                }
            });
        //need to return a subscription
        return new CompositeDisposable(sourceSubscription, clockSubscription);
    }).Publish().RefCount(); // prevent subscribers provoking more than one subscription to source and clock
}

当然这是一个老问题。我一直在寻找比timeoutWith更高级的东西,以便我可以取消超时逻辑。

我想知道超时是否几乎等同于:

race(throwError('timedout').pipe(delay(10000)), yourObs$)

那么这里显示的"throwError"当然是可以取消的。

如果你想知道为什么 - 我有一些由可观察链控制的"步骤",我有一个超时。但是,如果其中一个步骤包括打开对话框,那么我希望取消超时!

相关内容

  • 没有找到相关文章

最新更新