Rx滑动窗口-适当清理



我一直在尝试建立一个断路器,可以配置为跳闸复合规则,如:

  • 45秒内2次超时异常;或
  • 5分钟内任何其他类型异常的50

所以我认为最好的方法是使用滑动窗口(或者时间缓冲区,随便你怎么称呼它们)。

根据我在网上找到的东西和我自己拼凑的东西,我写了这个简单的控制台应用程序:

static IEnumerable<ConsoleKeyInfo> KeyPresses()
{
    ConsoleKeyInfo key;
    do
    {
        key = Console.ReadKey();
        yield return key;
    } while (key.Key != ConsoleKey.Escape);
}
static IObservable<int> TimeLimitedThreshold<T>(IObservable<T> source, TimeSpan timeLimit, int threshold)
{
    return source.Window(source, _ => Observable.Timer(timeLimit))
        .Select(x => x.Count())
        .Merge()
        .Where(count => count >= threshold)
        .Take(1);
}
static void Main(string[] args)
{
    Console.WriteLine("Starting");
    var timeLimit = TimeSpan.FromSeconds(5);
    const int threshold = 3;
    var keys = KeyPresses().ToObservable(Scheduler.Default).Publish().RefCount();
    var thresholdHit = TimeLimitedThreshold(keys, timeLimit, threshold);
    thresholdHit.Subscribe(count => Console.WriteLine("THRESHOLD BREACHED! Count is: {0}", count));
    // block the main thread so we don't terminate
    keys.Where(key => key.Key == ConsoleKey.Escape).FirstAsync().Wait();
    Console.WriteLine("Finished");
}

(如果我应该把它放在要点或粘贴文件中,而不是放在问题中,请直接说出来)

现在这似乎做了我想要的,如果我在5秒内按任何键3次或更多次,"阈值突破!!"打印一次,没有更多的事情发生。

我的问题是:

  • TimeLimitedThreshold函数是否在一个合理的抽象级别上与Rx.Net一起工作?
  • 我应该注入调度使用时创建Observable.Timer()?
  • 如果有的话,我错过了什么清理?涉及到Rx时的内存使用情况。. NET真的让我很困惑。

我可能会这样写阈值函数,利用Timestamp组合子。

    public static IObservable<U> TimeLimitedThreshold
        <T,U>
        ( this IObservable<T> source
        , int count
        , TimeSpan timeSpan
        , Func<IList<T>,U> selector
        , IScheduler scheduler = null
        )
    {
        var tmp = scheduler == null
            ? source.Timestamp()
            : source.Timestamp(scheduler);
        return  tmp
             .Buffer(count, 1).Where(b=>b.Count==count)
             .Select(b => new { b, span = b.Last().Timestamp - b.First().Timestamp })
             .Where(o => o.span <= timeSpan)
             .Select(o => selector(o.b.Select(ts=>ts.Value).ToList()));
    }

作为一个额外的方便,当触发器被触发时,满足触发器的完整缓冲区被提供给你的选择器函数。

例如

 var keys = KeyPresses().ToObservable(Scheduler.Default).Publish().RefCount();
 IObservable<string> fastKeySequences = keys.TimeLimitedThreshHold
     ( 3
     , TimeSpan.FromSeconds(5)
     , keys => String.Join("", keys)
     );

给出额外的IScheduler参数,因为Timestamp方法有一个额外的过载,取1。这可能是有用的,如果你想有一个自定义调度程序,不跟踪时间根据内部时钟。对于测试目的,使用历史调度程序可能是有用的,然后您将需要额外的过载。

,这里是一个完全工作的测试,显示了一个时间表的使用。(使用XUnit和FluentAssertions为Should().Be(…))

public class TimeLimitedThresholdSpec  : ReactiveTest
{
    TestScheduler _Scheduler = new TestScheduler();
    [Fact]
    public void ShouldWork()
    {
        var o = _Scheduler.CreateColdObservable
            ( OnNext(100, "A")
            , OnNext(200, "B")
            , OnNext(250, "C")
            , OnNext(255, "D")
            , OnNext(258, "E")
            , OnNext(600, "F")
            );
        var fixture = o
            .TimeLimitedThreshold
                (3
                , TimeSpan.FromTicks(20)
                , b => String.Join("", b)
                , _Scheduler
                );
        var actual = _Scheduler
            .Start(()=>fixture, created:0, subscribed:1, disposed:1000);
        actual.Messages.Count.Should().Be(1);
        actual.Messages[0].Value.Value.Should().Be("CDE");

    }
}

订阅and的方式如下

IDisposable subscription = fastKeySequences.Subscribe(s=>Console.WriteLine(s));

,当您想要取消订阅(清理内存和资源)时,您将处理订阅。简单。

subscription.Dispose()

这里有一种替代方法,它使用单个延迟而不是缓冲区和计时器。它不给你事件——它只是在有冲突的时候发出信号——但是它使用更少的内存,因为它不会占用太多的内存。

public static class ObservableExtensions
{
    public static IObservable<Unit> TimeLimitedThreshold<TSource>(
        this IObservable<TSource> source,
        long threshold,
        TimeSpan timeLimit,
        IScheduler s)
    {
        var events = source.Publish().RefCount();
        var count = events.Select(_ => 1)
                        .Merge(events.Select(_ => -1)
                                    .Delay(timeLimit, s));                                               
        return count.Scan((x,y) => x + y)              
                    .Where(c => c == threshold)
                    .Select(_ => Unit.Default);           
    }
}

Publish().RefCount()用于避免订阅多个源。查询将所有事件投射到1,并将延迟的事件流投射到-1,然后生成运行总数。如果运行总数达到阈值,我们发出一个信号(Unit.Default是表示没有有效负载的事件的Rx类型)。下面是一个测试(仅在LINQPad中使用nuget rx-testing运行):

void Main()
{    
    var s = new TestScheduler();
    var source = s.CreateColdObservable(
        new Recorded<Notification<int>>(100, Notification.CreateOnNext(1)),
        new Recorded<Notification<int>>(200, Notification.CreateOnNext(2)),
        new Recorded<Notification<int>>(300, Notification.CreateOnNext(3)),
        new Recorded<Notification<int>>(330, Notification.CreateOnNext(4)));
    var results = s.CreateObserver<Unit>();
    source.TimeLimitedThreshold(
        2,
        TimeSpan.FromTicks(30),
        s).Subscribe(results);
    s.Start();
    ReactiveAssert.AssertEqual(
        results.Messages,
        new List<Recorded<Notification<Unit>>> {
            new Recorded<Notification<Unit>>(
                330, Notification.CreateOnNext(Unit.Default))
        });
}

编辑

在Matthew Finlay观察到当阈值"在下降的过程中"通过时,上面的代码也会触发,我添加了这个版本,只检查阈值在正方向上的跨越:

public static class ObservableExtensions
{
    public static IObservable<Unit> TimeLimitedThreshold<TSource>(
        this IObservable<TSource> source,
        long threshold,
        TimeSpan timeLimit,
        IScheduler s)
    {
        var events = source.Publish().RefCount();
        var count = events.Select(_ => 1)
                        .Merge(events.Select(_ => -1)
                                    .Delay(timeLimit, s));                                                   
        return count.Scan((x,y) => x + y)          
                    .Scan(new { Current = 0, Last = 0},
                          (x,y) => new { Current = y, Last = x.Current })                        
                    .Where(c => c.Current == threshold && c.Last < threshold)
                    .Select(_ => Unit.Default);                          
    }
}

相关内容

  • 没有找到相关文章

最新更新