如果上次回调尚未完成,则忽略传入的流更新



我有类似的代码。

IPruduceDemUpdates.Subscribe(update => DoUpdate(update));

但我想做的就是这样的事情。

IPruduceDemUpdates.Subscribe(update => if(NoDoUpadteIsRunning) DoUpdate(update));

因此,如果update方法已经在运行,它会忽略传入的更新。此外,它应该始终执行最后一次更新。无论这是流的最后一次更新还是一段时间内的最后一个更新。这里是一个示例时间轴

  • 更新1开始
  • 忽略更新2
  • 忽略更新3
  • 忽略更新4
  • 更新1已完成
  • 更新4开始
  • 更新4已完成

编辑

我有跳过的解决方案

        IPruduceDemUpdates.Subscribe(update =>
        {
            if (_task == null || _task.IsCompleted || _task.IsCanceled || _task.IsFaulted)
                _task = DoUpdate(update);
        });

但我不知道如何确定,最后一次更新会处理。

如果DoUpdate是同步的(在本例中似乎是同步的),则可以使用Rxx中的BufferIntrospective。它正是你想要的:

IProduceDemUpdates
    .BufferIntrospective()
    .Where(items => items.Count > 0) // ignore empty buffers
    .Select(items => items[items.Count - 1]) // ignore all but last item in buffer
    .Subscribe(DoUpdate);

我参加聚会有点晚了,但这里有一种内置的线性方法。带有TimeSpan.ZeroSample将持续对源进行采样,并始终推送最后一次更新-要实现这一点,您需要给Sample它自己的线程,以便采样的源可以不间断地运行:

IProduceDemUpdates.Sample(TimeSpan.Zero, NewThreadScheduler.Default)
                  .Subscribe(DoUpdate);

备注

线程按每个订阅者重用Sample,而不是按每个事件生成,因此无需担心创建大量线程。你也可以做.Sample(TimeSpan.Zero, new EventLoopScheduler())。关键是,订阅服务器在Sample采样的同一线程上被传递事件,因此Sample在订阅服务器完成之前不会对下一个事件进行采样。

示例

以下代码:

// emits 0,1,2,3,4,5,6,7,8,9
var source = Observable.Interval(TimeSpan.FromSeconds(1)).Take(10); 
source.Sample(TimeSpan.Zero, NewThreadScheduler.Default).Subscribe(x => {
        Console.WriteLine(x);
        Task.Delay(TimeSpan.FromSeconds(3)).Wait();
});

通常输出:

0
2
5
9

要说服自己总是收到最后一次更新,请尝试:

// emit 0,1,2
var source = Observable.Interval(TimeSpan.FromSeconds(1)).Take(3);
source.Sample(TimeSpan.Zero, NewThreadScheduler.Default).Subscribe(x => {
        Console.WriteLine(x);
        Task.Delay(TimeSpan.FromSeconds(10)).Wait();
});

将输出:

0
2

洞穴

无论您的方法是什么,请注意在连接到Sample的订户的同步链中完成最慢的操作是多么重要。如果您在下游有异步订阅(如ObserveOn),其中包含较慢的运算符,那么您只会将背压转移到其他地方。

相关内容

  • 没有找到相关文章

最新更新