System.Reactive-动态切换频率



我有两个数据源。

让我们想象:

  • 系统A 提供具有更高频率的更好质量数据,例如
    1price/1sec,但有时有故障,没有数据或
    频率是1price/20sec
  • 系统B 提供较低频率的数据,例如1price/10sec

使用 System.Reactive 通常有任何优雅的方式来从系统A中检索数据,但是当它失败时(feed中没有数据(或放慢速度,以使用系统B中的数据?我想实现某种交换机,该开关将在源快的速度时使用。我不想混合来源,因此我只能一次使用Systema或SystemB。


    class PriceFeed {
        public IObservable<Price> GetPricesFeed(IObservable<PriceFromA> pricesFromA, IObservable<PriceFromB> pricesFromB)
        {

        }

        private Price Convert(PriceFromA price) { //convert }
        private Price Convert(PriceFromB price) { //convert }
    }

有趣的问题。首先要做的是编写某种频率收集功能。看起来像这样:

public static IObservable<int> GetFrequency<T>(this IObservable<T> source, TimeSpan measuringFreq, TimeSpan lookback)
{
    return source.GetFrequency(measuringFreq, lookback, Scheduler.Default);
}
public static IObservable<int> GetFrequency<T>(this IObservable<T> source, TimeSpan measuringFreq, TimeSpan lookback, IScheduler scheduler)
{
    return source.Buffer(lookback, measuringFreq, scheduler)
        .Select(l => l.Count);
}

如果measuringFreq为1秒,而lookback为5秒,则意味着每秒我们将看到最后5秒内传递了多少消息的计数。快速而肮脏的例子:

var r = new System.Random();
var nums = Observable.Generate(
    0, 
    i => i < 100, 
    i => i + 1, 
    i => i, _ => TimeSpan.FromSeconds(r.NextDouble() * 1)
);
var freq = nums.GetFrequency(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(5));
freq.Dump(); //Linqpad

nums是可观察到的,应平均每半秒生成消息(它在0到1秒之间随机选择持续时间(。freq每秒产生一个值,该值返回最近5秒内产生的消息nums(平均应为10(。在我的机器上的最新运行中,我得到了:

11
11
12
10
12
11
9
9
10
9
8
...

一旦我们有一种获取频率的方法,就需要编写一个函数,以根据频率切换两个类似的观测值。我写了这篇文章:

public static IObservable<T> MaintainFrequencyImproper<T>(this IObservable<T> sourceA, IObservable<T> sourceB, TimeSpan measuringFreq, TimeSpan lookback, IScheduler scheduler, int aAdvantage = 0)
{
    var aFreq = sourceA.GetFrequency(measuringFreq, lookback, scheduler);
    var bFreq = sourceB.GetFrequency(measuringFreq, lookback, scheduler);
    var toReturn = aFreq.Zip(bFreq, (a, b) => a + aAdvantage - b)
        .Select(freqDifference => freqDifference < 0 ? sourceB : sourceA)   //If advantage is 0, and a & b both popped out 5 messages in the last second, then A wins
        .StartWith(sourceA)
        .Switch();
    return toReturn;
}

首先,我们使用GetFrequency的两个可观察到的频率,然后将这两个将其拉链,然后对其进行比较。如果B比A更频繁,则使用B。如果它们等效频繁或A更频繁,则使用A。

aAdvantage变量允许您表达对超过B的更强偏好。0(默认值(表示源a赢得平局,或者更频繁时,但b赢得了。2意味着B在最近时期内必须比A多3个消息要多B,以使用B。

使用可观察到的适当的Publishing,可以避免多个订阅,这看起来像这样:

public static IObservable<T> MaintainFrequencyProper<T>(this IObservable<T> sourceA, IObservable<T> sourceB, TimeSpan measuringFreq, TimeSpan lookback, 
    IScheduler scheduler, int aAdvantage = 0)
{
    return sourceA.Publish(_sourceA => sourceB.Publish(_sourceB => 
        _sourceA.GetFrequency(measuringFreq, lookback, scheduler)
            .Zip(_sourceB.GetFrequency(measuringFreq, lookback, scheduler), (a, b) => a + aAdvantage - b)
            .Select(freqDifference => freqDifference < 0 ? _sourceB : _sourceA)
            .StartWith(_sourceA)
            .Switch()
    ))
}

我希望这会有所帮助。在如何将其适合您的代码中,您没有太多。如果您愿意,请加入MCVE。

最新更新