Cron 可观察序列



我想使用反应式扩展(RX(和NCrontab创建一个可观察序列。该顺序与Observable.Timer()的不同之处在于,期限和到期时间不是固定的。阅读本文后,似乎Observable.Generate()是要走的路。我正在考虑两种变体:一种在边界内运行,另一种永远运行。这些实现有意义吗?

public static IObservable<DateTime> Cron(string cron)
{
    var schedule = CrontabSchedule.Parse(cron);
    return Observable.Generate(DateTime.Now, d=>true, d => DateTime.Now, d => d,
        d => new DateTimeOffset(schedule.GetNextOccurrence(d)));
}
public static IObservable<DateTime> Cron(string cron, DateTime start, DateTime end)
{
    var schedule = CrontabSchedule.Parse(cron);
    return Observable.Generate(start, d => d < end, d => DateTime.Now, d => d,
        d => new DateTimeOffset(schedule.GetNextOccurrence(d)));
}

更新:这些似乎在经验上起作用,但是我添加了一个重载,它需要IScheduler并且似乎无法在单元测试中触发序列。我使用TestScheduler错误还是函数实现有问题?

public static IObservable<int> Cron(string cron, IScheduler scheduler)
{
    var schedule = CrontabSchedule.Parse(cron);
    return Observable.Generate(0, d => true, d => d + 1, d => d,
        d => new DateTimeOffset(schedule.GetNextOccurrence(scheduler.Now.DateTime)), scheduler);
}
[TestClass]
public class EngineTests
{
    [TestMethod]
    public void TestCron()
    {
        var scheduler = new TestScheduler();
        var cron = "* * * * *";
        var values = new List<int>();
        var disp = ObservableCron.Cron(cron, scheduler).Subscribe(values.Add);
        scheduler.AdvanceBy(TimeSpan.TicksPerMinute - 1);
        scheduler.AdvanceBy(1);
        scheduler.AdvanceBy(1);
        Assert.IsTrue(values.Count> 0);
    }
}

它看起来像是问题的组合。首先,我正在使用的Observable.Generate重载采用Func<int,DateTimeOffset>参数来确定下次触发的时间。我正在传递一个基于调度程序的本地时间而不是 Utc 的新DateTimeOffset,这导致新的"日期时间偏移量"移动。有关解释,请参阅此问题。正确的功能如下:

public static IObservable<int> Cron(string cron, IScheduler scheduler)
{
    var schedule = CrontabSchedule.Parse(cron);
    return Observable.Generate(0, d => true, d => d + 1, d => d,
        d => new DateTimeOffset(schedule.GetNextOccurrence(scheduler.Now.UtcDateTime)), scheduler);
}

就测试而言,我想出了一些可以更好地展示意图的东西:

[TestMethod]
public void TestCronInterval()
{
    var scheduler = new TestScheduler();
    var end = scheduler.Now.UtcDateTime.AddMinutes(10);
    const string cron = "*/5 * * * *";
    var i = 0;
    var seconds = 0;
    var sub = ObservableCron.Cron(cron, scheduler).Subscribe(x => i++);
    while (i < 2)
    {
        seconds++;
        scheduler.AdvanceBy(TimeSpan.TicksPerSecond);
    }
    Assert.IsTrue(seconds == 600);
    Assert.AreEqual(end, scheduler.Now.UtcDateTime);
    sub.Dispose();
}

我在没有使用 Cronos 的调度程序的情况下使用了这个解决方案:

public static IObservable<DateTimeOffset> ToObservable(this ICronScheduleObservableConfiguration configuration)
{
    Validate(configuration);
    var schedule = configuration.Expression;
    DateTimeOffset? next = null;
    return Observable.Generate(
        DateTimeOffset.Now,
            i => true,
            i => (next = schedule.GetNextOccurrence(i, configuration.TimeZone)) ?? DateTimeOffset.Now,
            i => next,
            i => i
        ).
        Where(i => i.HasValue).
        Select(i => i.Value);
}
public interface ICronScheduleObservableConfiguration :
    IObservableConfiguration
{
    /// <summary>
    /// Cron schedule with format: https://github.com/HangfireIO/Cronos#cron-format
    /// </summary>
    /// <value>Non-empty</value>
    string Schedule { get; }
    /// <summary>
    /// <see cref="Schedule"/> format
    /// </summary>
    CronFormat Format { get; }
    /// <summary>
    /// Parsed <see cref="Schedule"/> using <see cref="Format"/>
    /// </summary>
    /// <value>non-null</value>
    /// <exception cref="CronFormatException">Parsing with <see cref="CronExpression.Parse(string, CronFormat)"/> failed</exception>
    CronExpression Expression { get; }
    /// <summary>
    /// Time zone used for computing schedule times with <see cref="CronExpression.GetNextOccurrence(DateTimeOffset, TimeZoneInfo, bool)"/>
    /// </summary>
    /// <value>non-null</value>
    TimeZoneInfo TimeZone { get; }
}

首先,scheduler.Now.DateTime不会在单元测试中使用TestScheduler为您提供实时时间。 它将根据一些预定义的开始时间为您提供虚拟时间。 您可能应该使用 AdvanceTo 将时钟初始化为与您的 crontab 测试数据相对应的内容。

对于此示例测试,这可能不是您的问题。 您的问题很可能是您在Tick的粒度下编写测试。 这很少奏效。 因为,对于TestScheduler,当一个计划的操作发生在 Tick t上,该计划操作计划另一个操作"立即"执行时,下一个操作实际上不会执行,直到 Tick t+1 。 如果该操作计划另一个操作,则在勾选t+2等之前不会执行。 因此,除非您完全了解Generate如何安排其工作,否则您希望避免编写即时报价级别的测试。

相反,请尝试以要测试的代码支持的粒度进行测试。 在这种情况下,我认为这是几分钟...因此,将测试写成前进 59 秒,看看什么也没发生,然后再前进 2 秒,看看你是否得到了你期望的。 或者,更好的是,使用TestScheduler.CreateObserver方法并只前进一次

var scheduler = new TestScheduler();
// set the virtual clock to something that corresponds with my test crontab data
scheduler.AdvanceTo(someDateTimeOffset);
// now your tests...
var cron = "* * * * *";
var observer = scheduler.CreateObserver();
var disp = ObservableCron.Cron(cron, scheduler).Subscribe(observer);
// advance the clock by 61 seconds
scheduler.AdvanceBy(TimeSpan.FromSeconds(61).Ticks);
// check the contents of the observer
Assert.IsTrue(observer.Messages.Count > 0);

相关内容

  • 没有找到相关文章

最新更新