如何获得每个唯一键的最新值?
在这个样本中,我有股票行情和价格。我如何将其组合,使其输出具有最新价格的独特股票?
在现实生活中,这一连串的事件将持续数年,我只需要一只股票的当前价格。
using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
namespace LearnRx
{
class Program
{
public class Stock
{
public string Symbol { get; set; }
public float Price { get; set; }
public DateTime ModifiedAt { get; set; }
public override string ToString()
{
return "Stock: " + Symbol + " " + Price + " " + ModifiedAt;
}
}
static void Main(string[] args)
{
var baseTime = new DateTime(2000, 1, 1, 1, 1, 1);
var fiveMinutes = TimeSpan.FromMinutes(5);
var stockEvents = new Stock[]
{
new Stock
{
Symbol = "MSFT",
Price = 100,
ModifiedAt = baseTime
},
new Stock
{
Symbol = "GOOG",
Price = 200,
ModifiedAt = baseTime + fiveMinutes
},
new Stock
{
Symbol = "MSFT",
Price = 150,
ModifiedAt = baseTime + fiveMinutes + fiveMinutes
},
new Stock
{
Symbol = "AAPL",
Price = 300,
ModifiedAt = baseTime + fiveMinutes + fiveMinutes + fiveMinutes
},
}.ToObservable();
var scheduler = new HistoricalScheduler();
var replay = Observable.Generate(
stockEvents.GetEnumerator(),
events => events.MoveNext(),
events => events,
events => events.Current,
events => events.Current.ModifiedAt,
scheduler);
replay
.Subscribe( i => Console.WriteLine($"Event: {i} happened at {scheduler.Now}"));
scheduler.Start();
}
}
}
听起来你想要一个GroupBy
,后面跟着一个应该存在但不存在的CombineLatest
。
这是丢失的CombineLatest
:
public static class X
{
public static IObservable<ImmutableDictionary<TKey, TValue>> CombineLatest<TKey, TValue>(this IObservable<IGroupedObservable<TKey, TValue>> source)
{
return source
.SelectMany(o => o.Materialize().Select(n => (Notification: n, Key: o.Key)))
.Scan(default(Notification<ImmutableDictionary<TKey, TValue>>), (stateNotification, t) => {
var state = stateNotification?.Value ?? ImmutableDictionary<TKey, TValue>.Empty;
switch(t.Notification.Kind) {
case NotificationKind.OnError:
return Notification.CreateOnError<ImmutableDictionary<TKey, TValue>>(t.Notification.Exception);
case NotificationKind.OnCompleted:
return Notification.CreateOnNext(state.Remove(t.Key));
case NotificationKind.OnNext:
return Notification.CreateOnNext(state.SetItem(t.Key, t.Notification.Value));
default:
throw new NotImplementedException();
}
})
.Dematerialize();
}
}
然后是Main
:中的最后一个查询
var prices = replay
.GroupBy(s => s.Symbol)
.CombineLatest()
这将为每个价位提供一个Observable<Dictionary<string, Stock>>
。这将使您能够在下游执行任何操作。