使用 Rx 运行具有单个并发执行限制的定期任务的好方法是什么?



我想在一个限制条件下运行周期性任务,即在任何给定的时间内,一个方法最多只能执行一次。

我正在试验Rx,但我不确定如何施加最多一次并发限制。

var timer = Observable.Interval(TimeSpan.FromMilliseconds(100));
timer.Subscribe(tick => DoSomething());

另外,如果一个任务仍在运行,我希望后续的调度结束。也就是说,我不希望任务排队,造成问题。

我有两个这样的任务要定期执行。当前正在执行的任务是同步的。但是,如果有必要,我可以让它们异步

你是在正确的轨道上,你可以使用Select + Concat来平化可观察到的并限制飞行请求的数量(注意:如果你的任务需要的时间比间隔时间长,那么它们将开始堆积,因为它们不能执行得足够快):

var source = Observable.Interval(TimeSpan.FromMilliseconds(100))
          //I assume you are doing async work since you want to limit concurrency
          .Select(_ => Observable.FromAsync(() => DoSomethingAsync()))
          //This is equivalent to calling Merge(1)
          .Concat();
source.Subscribe(/*Handle the result of each operation*/);

您应该按原样测试您的代码,因为这正是Rx已经规定的。

试试这个测试:

void Main()
{
    var timer = Observable.Interval(TimeSpan.FromMilliseconds(100));
    using (timer.Do(x => Console.WriteLine("!")).Subscribe(tick => DoSomething()))
    {
        Console.ReadLine();
    }
}
private void DoSomething()
{
    Console.Write("<");
    Console.Write(DateTime.Now.ToString("HH:mm:ss.fff"));
    Thread.Sleep(1000);
    Console.WriteLine(">");
}

当你运行这个命令时,你会得到这样的输出:

!
<16:54:57.111>
!
<16:54:58.112>
!
<16:54:59.113>
!
<16:55:00.113>
!
<16:55:01.114>
!
<16:55:02.115>
!
<16:55:03.116>
!
<16:55:04.117>
!
<16:55:05.118>
!
<16:55:06.119

它已经确保没有重叠

下面是PeriodicSequentialExecution方法的两个实现,它们通过周期性地执行异步方法来创建一个可观察对象,强制执行无重叠执行策略。后续执行之间的间隔可以延长以防止重叠,在这种情况下,周期将相应地进行时移。

第一个实现是纯函数式的,而第二个实现主要是命令式的。这两种实现在功能上是相同的。第一个可以与定制的IScheduler一起提供。第二种方法可能会稍微有效一些。

功能实现:

/// <summary>
/// Creates an observable sequence containing the results of an asynchronous
/// action that is invoked periodically and sequentially (without overlapping).
/// </summary>
public static IObservable<T> PeriodicSequentialExecution<T>(
    Func<CancellationToken, Task<T>> action,
    TimeSpan dueTime, TimeSpan period,
    CancellationToken cancellationToken = default,
    IScheduler scheduler = null)
{
    // Arguments validation omitted
    scheduler ??= DefaultScheduler.Instance;
    return Delay(dueTime) // Initial delay
        .Concat(Observable.Using(() => CancellationTokenSource.CreateLinkedTokenSource(
            cancellationToken), linkedCTS => 
            // Execution loop
            Observable.Publish( // Start a hot delay timer before each operation
                Delay(period), hotTimer => Observable
                    .StartAsync(() => action(linkedCTS.Token)) // Start the operation
                    .Concat(hotTimer) // Await the delay timer
            )
            .Repeat()
            .Finally(() => linkedCTS.Cancel()) // Unsubscription: cancel the operation
        ));
    IObservable<T> Delay(TimeSpan delay)
        => Observable
            .Timer(delay, scheduler)
            .IgnoreElements()
            .Select(_ => default(T))
            .TakeUntil(Observable.Create<Unit>(o => cancellationToken.Register(() =>
                o.OnError(new OperationCanceledException(cancellationToken)))));
}

命令式实现:

public static IObservable<T> PeriodicSequentialExecution2<T>(
    Func<CancellationToken, Task<T>> action,
    TimeSpan dueTime, TimeSpan period,
    CancellationToken cancellationToken = default)
{
    // Arguments validation omitted
    return Observable.Create<T>(async (observer, ct) =>
    {
        using (var linkedCTS = CancellationTokenSource.CreateLinkedTokenSource(
            ct, cancellationToken))
        {
            try
            {
                await Task.Delay(dueTime, linkedCTS.Token);
                while (true)
                {
                    var delayTask = Task.Delay(period, linkedCTS.Token);
                    var result = await action(linkedCTS.Token);
                    observer.OnNext(result);
                    await delayTask;
                }
            }
            catch (Exception ex) { observer.OnError(ex); }
        }
    });
}

cancellationToken参数可用于优雅地终止结果可观察序列。这意味着序列在终止之前等待当前运行的操作完成。如果您希望它立即终止,可能会使工作以"即发即弃"的方式运行,那么您可以像往常一样,简单地解除对可观察序列的订阅。取消cancellationToken将导致可观察序列以故障状态(OperationCanceledException)完成。

这是一个完全符合您要求的工厂函数。

public static IObservable<Unit> Periodic(TimeSpan timeSpan)
{
    return Observable.Return(Unit.Default).Concat(Observable.Return(Unit.Default).Delay(timeSpan).Repeat());
}

下面是一个使用

的例子
Periodic(TimeSpan.FromSeconds(1))
    .Subscribe(x =>
    {
        Console.WriteLine(DateTime.Now.ToString("mm:ss:fff"));
        Thread.Sleep(500);
    });

如果运行此命令,每次控制台打印将相隔大约1.5秒。

注意,如果你不希望第一个tick立即运行,你可以使用这个工厂,它不会发送第一个Unit直到时间范围之后。

public static IObservable<Unit> DelayedPeriodic(TimeSpan timeSpan)
{
    return Observable.Return(Unit.Default).Delay(timeSpan).Repeat();
}

相关内容

  • 没有找到相关文章

最新更新