如何使用 Rx.Net 来处理突发事件或垃圾事件?



我正在编写代码来处理来自设备的事件,在这些设备中,所述事件可能会突发发生,这是常见情况,或者在错误情况下可能非常频繁地发生。

我想要的行为是:

  • 切勿在给定时间范围内多次转发事件。(1 分钟(
  • 在一般情况下,不要拆分事件突发(通常几秒钟(,等待静默期后再转发
  • 在错误情况下,如果正在生成事件,但在 2 倍的时间窗口内没有转发任何事件,请转发最后一个事件

我发现 Throttle 通过等待一段时间然后发送最后一个事件来按照我想要的方式工作突发。但是,如果事件是垃圾邮件,则 Throttle 从不转发任何事件,因为静默期窗口会反复重置。

我发现 Sample 效果很好,除了在时间窗口结束时发生突发时,因为我对突发中间发生的事件不感兴趣。

我知道这可以通过使用 Switch 或 Join 来解决,但我还没有找到足够接近我的场景的示例来为我单击它。

using System;
using System.Reactive.Linq;
using System.Threading.Tasks;
using System.Reactive.Concurrency;
using System.Diagnostics;
public class Program
{
public static event EventHandler<FakeEventArgs> DeviceChange;
public static TimeSpan window = TimeSpan.FromSeconds(60);
public static uint eventCounter = 0;
public static async Task Main()
{
var window2x = window + window;
//Just to give a visual sense of when things are happening
Observable.Interval(window).Subscribe(iterator => Console.WriteLine($"Non-sliding Window {iterator}"));
//Create observable from standard event in order to use Rx.
var eventsAsObservables = Observable.FromEventPattern<FakeEventArgs>
(
handler => DeviceChange += handler,
handler => DeviceChange -= handler
);
//pure throttle doesn't work in the case where events always firing faster than the time window (i.e. device with faulty connection)
//eventsAsObservables
//  .Throttle(window)
//  .ObserveOn(ThreadPoolScheduler.Instance)
//  .Subscribe(evt => { var now = DateTime.Now; Console.WriteLine($"Event: {evt.EventArgs.Message} Sent at: {evt.EventArgs.Created.TimeOfDay}  Handled at: {now.TimeOfDay} Elapsed: {(now - evt.EventArgs.Created).TotalSeconds}"); });
//pure sample doesn't work in the case where clusters of events are happening across the time window boundary (i.e. device unplugged right at time window)
//eventsAsObservables
//  .Sample(window)
//  .ObserveOn(ThreadPoolScheduler.Instance)
//  .Subscribe(evt => { var now = DateTime.Now; Console.WriteLine($"Event: {evt.EventArgs.Message} Sent at: {evt.EventArgs.Created.TimeOfDay}  Handled at: {now.TimeOfDay} Elapsed: {(now - evt.EventArgs.Created).TotalSeconds}"); });
var throttled = eventsAsObservables.Throttle(window);
var sampled = eventsAsObservables.Sample(window2x);
//plain merge will forward extra events to subscribers
//throttled
//  .Merge(sampled)
//  .ObserveOn(ThreadPoolScheduler.Instance)
//  .Subscribe(evt => { var now = DateTime.Now; Console.WriteLine($"Event: {evt.EventArgs.Message} Sent at: {evt.EventArgs.Created.TimeOfDay}  Handled at: {now.TimeOfDay} Elapsed: {(now - evt.EventArgs.Created).TotalSeconds}"); });
//How do I alter this to get the desired behavior?
throttled
.Select(selector => sampled)
.Switch()
.ObserveOn(ThreadPoolScheduler.Instance)
.Subscribe(evt => { var now = DateTime.Now; Console.WriteLine($"Event: {evt.EventArgs.Message} Sent at: {evt.EventArgs.Created.TimeOfDay}  Handled at: {now.TimeOfDay} Elapsed: {(now - evt.EventArgs.Created).TotalSeconds}"); });
Console.WriteLine($"About to start raising events {DateTime.Now}");
//RaiseEvent($"{++eventCounter}");
//These events occur very frequently
//They cause Throttle to never forward anything because the quiet timer gets reset
StartSpammyEventsAsync(100);
//These events will burst on the time boundary 
//Causes Throttle to never forward event because the quiet timer gets reset just before it expires
//Causes Sample to forward event from the middle of the burst instead of the end
StartBurstyEventsAsync(window);
Console.WriteLine("nPress ENTER to exit...n");
Console.ReadLine();
}
static void RaiseEvent(string eventedMessage) =>
DeviceChange?.Invoke(null, new FakeEventArgs(eventedMessage));
static async Task StartSpammyEventsAsync(int milliSeconds)
{
while (true)
{
await Task.Delay(milliSeconds).ConfigureAwait(false);
Debug.WriteLine($"Raising event {eventCounter}");
RaiseEvent($"{++eventCounter}");
}
}
static async Task StartBurstyEventsAsync(TimeSpan window)
{
while (true)
{
await Task.Delay(window - TimeSpan.FromSeconds(1)).ConfigureAwait(false);
//two second burst of events
var start = DateTime.Now;
var limit = TimeSpan.FromSeconds(2);
while (DateTime.Now - start < limit)
{
await Task.Delay(100).ConfigureAwait(false);
Debug.WriteLine($"Raising event {eventCounter}");
RaiseEvent($"{++eventCounter}");
}
}
}
public class FakeEventArgs : EventArgs
{
public readonly string Message;
public readonly DateTime Created;
protected FakeEventArgs() { }
public FakeEventArgs(string message):
base()
{
Created = DateTime.Now;
Message = message;
}
}
} 

混合这两种方法怎么样?这是一个ThrottleUntil扩展方法,它使用Throttle等待"突发"结束,并在"突发"未结束时Interval发出项目(也称为"垃圾邮件"(:

public static IObservable<T> ThrottleUntil<T>(this IObservable<T> source, TimeSpan window, IScheduler scheduler)
{
var throttle = source.Throttle(window, scheduler);
var until = Observable.Interval(window * 2, scheduler).Select(_ => default(T));
return source
.Buffer(() => Observable.Merge(throttle, until).Take(1))
.SelectMany(buffer => buffer.Any() ? buffer.TakeLast(1) : Enumerable.Empty<T>());
}

以下是一些测试用例(带有弹珠图,对于表达您想要发生的事情非常有用(,它们显示了它是如何工作的:

[TestFixture]
public class BurstySpammy
{
private static long SubscriptionOffset = ReactiveTest.Subscribed;
private static long TestOffset = ReactiveTest.Created + ReactiveTest.Subscribed;
private static IEnumerable<TestCaseData> TestCases
{
get
{
// Time:     0---1---2---3---4---5---6---7---8---9---0---1---2---3---4---5---6---7---8---9
// Source:   ----1---2---------------3---4-------------------5---------------------------- 
// Expected: --------------------2-------------------4-----------------------5------------
yield return new TestCaseData(new int[] { 1, 2, 6, 7, 13 }, new[] { 5, 10, 16 }, false).SetName("ShouldEmitFromBurst");
// Time:     0---1---2---3---4---5---6---7---8---9---0---1---2---3---4---5---6---7---8---9
// Source:   ----1-------2-------3-------4-------5-------6-------------------------------- 
// Expected: ------------------------3-----------------------6----------------------------
yield return new TestCaseData(new int[] { 1, 3, 5, 7, 9, 11 }, new[] { 6, 12 }, true).SetName("ShouldEmitFromSpam");
// Time:     0---1---2---3---4---5---6---7---8---9---0---1---2---3---4---5---6---7---8---9
// Source:   ----1---2---------------3---4---5-------6-------7---8---9-------------------- 
// Expected: --------------------2-----------------------7-----------------------9--------
yield return new TestCaseData(new int[] { 1, 2, 6, 7, 8, 10, 12, 13, 14 }, new[] { 5, 11, 17 }, false).SetName("ShouldEmitFromMix");
// Time:     0---1---2---3---4---5---6---7---8---9---0---1---2---3---4---5---6---7---8---9
// Source:   ----1------------------------------------------------------------------------ 
// Expected: ----------------1------------------------------------------------------------
yield return new TestCaseData(new int[] { 1 }, new[] { 4 }, false).SetName("ShouldNotEmitEmpty");
}
}
[TestCaseSource(nameof(TestCases))]
public void ShouldEmitCorrectly(int[] sourceTimes, int[] expectedTimes, bool basedOnSubscriptionTime)
{
var scheduler = new TestScheduler();
var source = sourceTimes
.Select((time, index) => new Recorded<Notification<int>>(TimeSpan.FromSeconds(time).Ticks, Notification.CreateOnNext(index)))
.ToArray();
var expected = expectedTimes
.Select(time => (Time: basedOnSubscriptionTime ? TimeSpan.FromSeconds(time).Ticks + SubscriptionOffset : TimeSpan.FromSeconds(time).Ticks, Value: source.Last(r => r.Time <= TimeSpan.FromSeconds(time).Ticks).Value.Value))
.Select(tuple => new Recorded<Notification<int>>(tuple.Time, Notification.CreateOnNext(tuple.Value)))
.ToArray();
var xs = scheduler
.CreateHotObservable(source)
.ThrottleUntil(TimeSpan.FromSeconds(3), scheduler);
var observed = scheduler.Start(() => xs, TimeSpan.FromSeconds(20).Ticks + TestOffset);
CollectionAssert.AreEqual(expected, observed.Messages);
}
}

希望对您有所帮助。

最新更新