我有两个数据源。
让我们想象:
-
系统A 提供具有更高频率的更好质量数据,例如
1price/1sec,但有时有故障,没有数据或
频率是1price/20sec - 系统B 提供较低频率的数据,例如1price/10sec
使用 System.Reactive 通常有任何优雅的方式来从系统A中检索数据,但是当它失败时(feed中没有数据(或放慢速度,以使用系统B中的数据?我想实现某种交换机,该开关将在源快的速度时使用。我不想混合来源,因此我只能一次使用Systema或SystemB。
class PriceFeed {
public IObservable<Price> GetPricesFeed(IObservable<PriceFromA> pricesFromA, IObservable<PriceFromB> pricesFromB)
{
}
private Price Convert(PriceFromA price) { //convert }
private Price Convert(PriceFromB price) { //convert }
}
有趣的问题。首先要做的是编写某种频率收集功能。看起来像这样:
public static IObservable<int> GetFrequency<T>(this IObservable<T> source, TimeSpan measuringFreq, TimeSpan lookback)
{
return source.GetFrequency(measuringFreq, lookback, Scheduler.Default);
}
public static IObservable<int> GetFrequency<T>(this IObservable<T> source, TimeSpan measuringFreq, TimeSpan lookback, IScheduler scheduler)
{
return source.Buffer(lookback, measuringFreq, scheduler)
.Select(l => l.Count);
}
如果measuringFreq
为1秒,而lookback
为5秒,则意味着每秒我们将看到最后5秒内传递了多少消息的计数。快速而肮脏的例子:
var r = new System.Random();
var nums = Observable.Generate(
0,
i => i < 100,
i => i + 1,
i => i, _ => TimeSpan.FromSeconds(r.NextDouble() * 1)
);
var freq = nums.GetFrequency(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(5));
freq.Dump(); //Linqpad
nums
是可观察到的,应平均每半秒生成消息(它在0到1秒之间随机选择持续时间(。freq
每秒产生一个值,该值返回最近5秒内产生的消息nums
(平均应为10(。在我的机器上的最新运行中,我得到了:
11
11
12
10
12
11
9
9
10
9
8
...
一旦我们有一种获取频率的方法,就需要编写一个函数,以根据频率切换两个类似的观测值。我写了这篇文章:
public static IObservable<T> MaintainFrequencyImproper<T>(this IObservable<T> sourceA, IObservable<T> sourceB, TimeSpan measuringFreq, TimeSpan lookback, IScheduler scheduler, int aAdvantage = 0)
{
var aFreq = sourceA.GetFrequency(measuringFreq, lookback, scheduler);
var bFreq = sourceB.GetFrequency(measuringFreq, lookback, scheduler);
var toReturn = aFreq.Zip(bFreq, (a, b) => a + aAdvantage - b)
.Select(freqDifference => freqDifference < 0 ? sourceB : sourceA) //If advantage is 0, and a & b both popped out 5 messages in the last second, then A wins
.StartWith(sourceA)
.Switch();
return toReturn;
}
首先,我们使用GetFrequency
的两个可观察到的频率,然后将这两个将其拉链,然后对其进行比较。如果B比A更频繁,则使用B。如果它们等效频繁或A更频繁,则使用A。
aAdvantage
变量允许您表达对超过B的更强偏好。0(默认值(表示源a赢得平局,或者更频繁时,但b赢得了。2意味着B在最近时期内必须比A多3个消息要多B,以使用B。
使用可观察到的适当的Publishing
,可以避免多个订阅,这看起来像这样:
public static IObservable<T> MaintainFrequencyProper<T>(this IObservable<T> sourceA, IObservable<T> sourceB, TimeSpan measuringFreq, TimeSpan lookback,
IScheduler scheduler, int aAdvantage = 0)
{
return sourceA.Publish(_sourceA => sourceB.Publish(_sourceB =>
_sourceA.GetFrequency(measuringFreq, lookback, scheduler)
.Zip(_sourceB.GetFrequency(measuringFreq, lookback, scheduler), (a, b) => a + aAdvantage - b)
.Select(freqDifference => freqDifference < 0 ? _sourceB : _sourceA)
.StartWith(_sourceA)
.Switch()
))
}
我希望这会有所帮助。在如何将其适合您的代码中,您没有太多。如果您愿意,请加入MCVE。