是否可以对按键分组的热可观察量中的最新项目进行采样?



在 Rx.NET 中,是否可以在按键分组的热可观察量中对最新项目进行采样?

例如,如果我有一个IObservable<Price>,其中Price是:

Price 
- Key
- Bid
- Offer

假设IObservable与外部价格馈送相关联。

我是否可以检索所有最新的Price,按Key分组,使用 Rx 每 1 秒采样一次?

假设有一些可观察的source,这将返回最后一秒按键分组和采样的所有价格。

var sampled = source
.GroupBy(p => p.Key)
.SelectMany(o => o.Sample(TimeSpan.FromSeconds(1)));

如果有一些价格在最后一秒没有收到消息,则不包括在内。

如果您希望包含旧价格,这将起作用:

var sampled2 = source
.Scan(ImmutableDictionary<int, Price>.Empty, (state, p) => state.SetItem(p.Key, p))
.Replay(1)
.RefCount();
var dummySubscription = sampled2.Subscribe();
var result = Observable.Interval(TimeSpan.FromSeconds(1))
.SelectMany(_ => sampled2.Take(1).SelectMany(state => state.Values));

只需确保在使用可观察result后处理掉DummySubscription即可。

这能做到你想要的吗?

IObservable<ImmutableDictionary<string, Price>> sampled =
Observable
.Create<ImmutableDictionary<string, Price>>(o =>
{
var output = ImmutableDictionary<string, Price>.Empty;
return
source
.Do(x => output = output.SetItem(x.Key, x))
.Select(x => Observable.Interval(TimeSpan.FromSeconds(1.0)).Select(y => output).StartWith(output))
.Switch()
.Subscribe(o);
});

热可观察量不会在内存中保留任何旧值,但您可以自己捕获每个已知键的最后价格,例如在字典中。

请参考以下示例代码。

Dictionary<string, double> _prices = new Dictionary<string, double>();
GetPrices()
.Buffer(TimeSpan.FromSeconds(1))
.Subscribe(prices =>
{
if (prices != null && prices.Count > 0)
{
var grouped = prices.GroupBy(x => x.Key);
foreach (var group in grouped)
_prices[group.Key] = group.Last().Bid;
}
//print out the last quote of each known price key
foreach (var price in _prices)
{
Console.WriteLine("Key: " + price.Key + ", last price: " + price.Value);
}
});

它应该每秒打印每个已知密钥的最后一个引号。

这是 Shlomo 想法的完善版本,即使用Scan运算符和ImmutableDictionary作为状态来维护每个键的最新值。下面的自定义运算符(SampleByKey(以特定的间隔对一系列键轴承元素进行采样。每次采样刻度时都会发出一个IDictionary<TKey, TSource>,其中包含到目前为止每个键发出的最新值。

public static IObservable<IDictionary<TKey, TSource>> SampleByKey<TSource, TKey>(
this IObservable<TSource> source,
Func<TSource, TKey> keySelector,
TimeSpan interval,
IEqualityComparer<TKey> keyComparer = default)
{
return source
.Scan(ImmutableDictionary.Create<TKey, TSource>(keyComparer),
(dict, x) => dict.SetItem(keySelector(x), x))
.Publish(published => Observable
.Interval(interval)
.WithLatestFrom(published, (_, dict) => dict)
.TakeUntil(published.LastOrDefaultAsync()));
}

使用示例:

IObservable<IDictionary<string, Price>> sampled = priceFeed
.SampleByKey(p => p.Key, TimeSpan.FromSeconds(1.0));

如果source在两次采样期间发出了零元素,则将连续发出相同的字典。

此实现与我之前在问题中发布的关于如何以动态可变的时间间隔对序列进行采样的实现非常相似。

注意:我删除了我以前的实现(修订版 1(,因为它太复杂并且可能泄漏。Shlomo的方法更容易理解,并根据需要进行修改。

最新更新