使用Rx,如何获得每个唯一密钥的最新值



如何获得每个唯一键的最新值?

在这个样本中,我有股票行情和价格。我如何将其组合,使其输出具有最新价格的独特股票?

在现实生活中,这一连串的事件将持续数年,我只需要一只股票的当前价格。

using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
namespace LearnRx
{
class Program
{
public class Stock
{
public string Symbol { get; set; }
public float Price { get; set; }
public DateTime ModifiedAt { get; set; }
public override string ToString()
{
return "Stock: " + Symbol + " " + Price + " " + ModifiedAt;
}
}
static void Main(string[] args)
{
var baseTime = new DateTime(2000, 1, 1, 1, 1, 1);
var fiveMinutes = TimeSpan.FromMinutes(5);
var stockEvents = new Stock[]
{
new Stock
{
Symbol = "MSFT",
Price = 100,
ModifiedAt = baseTime
},
new Stock
{
Symbol = "GOOG",
Price = 200,
ModifiedAt = baseTime + fiveMinutes
},
new Stock
{
Symbol = "MSFT",
Price = 150,
ModifiedAt = baseTime + fiveMinutes + fiveMinutes
},
new Stock
{
Symbol = "AAPL",
Price = 300,
ModifiedAt = baseTime + fiveMinutes + fiveMinutes + fiveMinutes
},
}.ToObservable();
var scheduler = new HistoricalScheduler();
var replay = Observable.Generate(
stockEvents.GetEnumerator(),
events => events.MoveNext(),
events => events,
events => events.Current,
events => events.Current.ModifiedAt,
scheduler);

replay
.Subscribe( i => Console.WriteLine($"Event: {i} happened at {scheduler.Now}"));
scheduler.Start();
}
}
}

听起来你想要一个GroupBy,后面跟着一个应该存在但不存在的CombineLatest

这是丢失的CombineLatest:

public static class X
{

public static IObservable<ImmutableDictionary<TKey, TValue>> CombineLatest<TKey, TValue>(this IObservable<IGroupedObservable<TKey, TValue>> source)
{
return source
.SelectMany(o => o.Materialize().Select(n => (Notification: n, Key: o.Key)))
.Scan(default(Notification<ImmutableDictionary<TKey, TValue>>), (stateNotification, t) => {
var state = stateNotification?.Value ?? ImmutableDictionary<TKey, TValue>.Empty;
switch(t.Notification.Kind) {
case NotificationKind.OnError:
return Notification.CreateOnError<ImmutableDictionary<TKey, TValue>>(t.Notification.Exception);
case NotificationKind.OnCompleted:
return Notification.CreateOnNext(state.Remove(t.Key));
case NotificationKind.OnNext:
return Notification.CreateOnNext(state.SetItem(t.Key, t.Notification.Value));
default:
throw new NotImplementedException();
}
})
.Dematerialize();
}
}

然后是Main:中的最后一个查询

var prices = replay
.GroupBy(s => s.Symbol)
.CombineLatest()

这将为每个价位提供一个Observable<Dictionary<string, Stock>>。这将使您能够在下游执行任何操作。

最新更新