我以每秒 150-200 次更新的速度获得更新事件。我想将其与每个键 1 秒混为一谈。
示例:在 1 秒内,我收到 3 个键 A、B、C 的更新,顺序为:
A1、B1、C1、A2、A3、B2我想每 1 秒处理一次此更新,并仅处理上面示例中的 A3、B2 和 C1。
如何使用反应式扩展执行此操作? 到目前为止,我尝试了:
Observable.FromEventPattern<EventArgs>(_listener, "EventHandler", System.Reactive.Concurrency.NewThreadScheduler.Default)
.GroupBy(x => x.EventArgs.Key)
.Subscribe(g =>
{
g.Sample(TimeSpan.FromSeconds(1))
.Subscribe(x1 =>
{
updateSubject.OnNext(key);
});
});
当然不是我所期望的。请为此提出正确的方法。
你想要的更像这样:
Observable
.FromEventPattern<EventArgs>(_listener, "EventHandler", System.Reactive.Concurrency.NewThreadScheduler.Default)
.GroupBy(x => x.EventArgs.Key)
.Select(g => g.Sample(TimeSpan.FromSeconds(1.0)))
.Merge()
.Subscribe(x =>
{
updateSubject.OnNext(key);
});
但是,在您的.Subscribe
内安装updateSubject.OnNext(key);
是一个非常糟糕的主意。您确实应该展示更多的代码,以便我们就如何正确处理它提供建议。
在那之后我就没有太多事情要做
Observable.FromEventPattern<EventArgs>(_listener, "EventHandler", System.Reactive.Concurrency.NewThreadScheduler.Default)
.GroupBy(x => x.EventArgs.Key)
.Subscribe(g =>
{
g.Sample(TimeSpan.FromSeconds(1))
.Subscribe(x1 =>
{
updateSubject.OnNext(key);
});
});
updateSubject
.SubscribeOn(NewThreadScheduler.Default)
.ObserveOn(NewThreadScheduler.Default)
.Subscribe(EventHandler); //Event Handler is the what gets called to handle the events
合并没有为我做这件事,但是我尝试了:
Observable.FromEventPattern<EventArgs>(_listener, "EventHandler", System.Reactive.Concurrency.NewThreadScheduler.Default)
.Distinct(x => x.EventArgs.Key)
.Sample(TimeSpan.FromSeconds(1))
.Subscribe(x1 =>
{
updateSubject.OnNext(x1.EventArgs.NewValue);
});
不确定我是否在这里做。
下面是一个自定义SampleLatestByKey
运算符,它可能会执行您想要的操作:
/// <summary>
/// Samples a sequence of key-bearing elements at a specific interval. Upon each
/// sampling tick an IDictionary<TKey, TSource> is emitted, containing the latest
/// values that were emitted by each key during the last sampling period.
/// </summary>
public static IObservable<IDictionary<TKey, TSource>> SampleLatestByKey<TSource, TKey>(
this IObservable<TSource> source,
Func<TSource, TKey> keySelector,
TimeSpan interval,
IEqualityComparer<TKey> keyComparer = default)
{
return source
.Window(interval)
.SelectMany(window => window
.Aggregate(new Dictionary<TKey, TSource>(keyComparer),
(dict, x) => { dict[keySelector(x)] = x; return dict; }));
}
使用示例:
Observable
.FromEventPattern<EventArgs>(_listener, "EventHandler")
.SampleLatestByKey(x => x.EventArgs.Key, TimeSpan.FromSeconds(1.0))
.Subscribe(dict => dict.Keys.ToObservable().Subscribe(updateSubject));
此问题中存在名为SampleByKey
的类似自定义运算符。这两个运算符之间的区别在于,SampleByKey
在可观察序列的整个生存期内为每个键发出最新值。SampleLatestByKey
仅传播在上一个采样间隔内发出的值。