我想在一个限制条件下运行周期性任务,即在任何给定的时间内,一个方法最多只能执行一次。
我正在试验Rx,但我不确定如何施加最多一次并发限制。
var timer = Observable.Interval(TimeSpan.FromMilliseconds(100));
timer.Subscribe(tick => DoSomething());
另外,如果一个任务仍在运行,我希望后续的调度结束。也就是说,我不希望任务排队,造成问题。
我有两个这样的任务要定期执行。当前正在执行的任务是同步的。但是,如果有必要,我可以让它们异步
你是在正确的轨道上,你可以使用Select
+ Concat
来平化可观察到的并限制飞行请求的数量(注意:如果你的任务需要的时间比间隔时间长,那么它们将开始堆积,因为它们不能执行得足够快):
var source = Observable.Interval(TimeSpan.FromMilliseconds(100))
//I assume you are doing async work since you want to limit concurrency
.Select(_ => Observable.FromAsync(() => DoSomethingAsync()))
//This is equivalent to calling Merge(1)
.Concat();
source.Subscribe(/*Handle the result of each operation*/);
您应该按原样测试您的代码,因为这正是Rx已经规定的。
试试这个测试:
void Main()
{
var timer = Observable.Interval(TimeSpan.FromMilliseconds(100));
using (timer.Do(x => Console.WriteLine("!")).Subscribe(tick => DoSomething()))
{
Console.ReadLine();
}
}
private void DoSomething()
{
Console.Write("<");
Console.Write(DateTime.Now.ToString("HH:mm:ss.fff"));
Thread.Sleep(1000);
Console.WriteLine(">");
}
当你运行这个命令时,你会得到这样的输出:
!
<16:54:57.111>
!
<16:54:58.112>
!
<16:54:59.113>
!
<16:55:00.113>
!
<16:55:01.114>
!
<16:55:02.115>
!
<16:55:03.116>
!
<16:55:04.117>
!
<16:55:05.118>
!
<16:55:06.119
它已经确保没有重叠
下面是PeriodicSequentialExecution
方法的两个实现,它们通过周期性地执行异步方法来创建一个可观察对象,强制执行无重叠执行策略。后续执行之间的间隔可以延长以防止重叠,在这种情况下,周期将相应地进行时移。
第一个实现是纯函数式的,而第二个实现主要是命令式的。这两种实现在功能上是相同的。第一个可以与定制的IScheduler
一起提供。第二种方法可能会稍微有效一些。
功能实现:
/// <summary>
/// Creates an observable sequence containing the results of an asynchronous
/// action that is invoked periodically and sequentially (without overlapping).
/// </summary>
public static IObservable<T> PeriodicSequentialExecution<T>(
Func<CancellationToken, Task<T>> action,
TimeSpan dueTime, TimeSpan period,
CancellationToken cancellationToken = default,
IScheduler scheduler = null)
{
// Arguments validation omitted
scheduler ??= DefaultScheduler.Instance;
return Delay(dueTime) // Initial delay
.Concat(Observable.Using(() => CancellationTokenSource.CreateLinkedTokenSource(
cancellationToken), linkedCTS =>
// Execution loop
Observable.Publish( // Start a hot delay timer before each operation
Delay(period), hotTimer => Observable
.StartAsync(() => action(linkedCTS.Token)) // Start the operation
.Concat(hotTimer) // Await the delay timer
)
.Repeat()
.Finally(() => linkedCTS.Cancel()) // Unsubscription: cancel the operation
));
IObservable<T> Delay(TimeSpan delay)
=> Observable
.Timer(delay, scheduler)
.IgnoreElements()
.Select(_ => default(T))
.TakeUntil(Observable.Create<Unit>(o => cancellationToken.Register(() =>
o.OnError(new OperationCanceledException(cancellationToken)))));
}
命令式实现:
public static IObservable<T> PeriodicSequentialExecution2<T>(
Func<CancellationToken, Task<T>> action,
TimeSpan dueTime, TimeSpan period,
CancellationToken cancellationToken = default)
{
// Arguments validation omitted
return Observable.Create<T>(async (observer, ct) =>
{
using (var linkedCTS = CancellationTokenSource.CreateLinkedTokenSource(
ct, cancellationToken))
{
try
{
await Task.Delay(dueTime, linkedCTS.Token);
while (true)
{
var delayTask = Task.Delay(period, linkedCTS.Token);
var result = await action(linkedCTS.Token);
observer.OnNext(result);
await delayTask;
}
}
catch (Exception ex) { observer.OnError(ex); }
}
});
}
cancellationToken
参数可用于优雅地终止结果可观察序列。这意味着序列在终止之前等待当前运行的操作完成。如果您希望它立即终止,可能会使工作以"即发即弃"的方式运行,那么您可以像往常一样,简单地解除对可观察序列的订阅。取消cancellationToken
将导致可观察序列以故障状态(OperationCanceledException
)完成。
这是一个完全符合您要求的工厂函数。
public static IObservable<Unit> Periodic(TimeSpan timeSpan)
{
return Observable.Return(Unit.Default).Concat(Observable.Return(Unit.Default).Delay(timeSpan).Repeat());
}
下面是一个使用
的例子Periodic(TimeSpan.FromSeconds(1))
.Subscribe(x =>
{
Console.WriteLine(DateTime.Now.ToString("mm:ss:fff"));
Thread.Sleep(500);
});
如果运行此命令,每次控制台打印将相隔大约1.5秒。
注意,如果你不希望第一个tick立即运行,你可以使用这个工厂,它不会发送第一个Unit直到时间范围之后。
public static IObservable<Unit> DelayedPeriodic(TimeSpan timeSpan)
{
return Observable.Return(Unit.Default).Delay(timeSpan).Repeat();
}