我一直在尝试建立一个断路器,可以配置为跳闸复合规则,如:
- 45秒内2次超时异常;或
- 5分钟内任何其他类型异常的50
所以我认为最好的方法是使用滑动窗口(或者时间缓冲区,随便你怎么称呼它们)。
根据我在网上找到的东西和我自己拼凑的东西,我写了这个简单的控制台应用程序:
static IEnumerable<ConsoleKeyInfo> KeyPresses()
{
ConsoleKeyInfo key;
do
{
key = Console.ReadKey();
yield return key;
} while (key.Key != ConsoleKey.Escape);
}
static IObservable<int> TimeLimitedThreshold<T>(IObservable<T> source, TimeSpan timeLimit, int threshold)
{
return source.Window(source, _ => Observable.Timer(timeLimit))
.Select(x => x.Count())
.Merge()
.Where(count => count >= threshold)
.Take(1);
}
static void Main(string[] args)
{
Console.WriteLine("Starting");
var timeLimit = TimeSpan.FromSeconds(5);
const int threshold = 3;
var keys = KeyPresses().ToObservable(Scheduler.Default).Publish().RefCount();
var thresholdHit = TimeLimitedThreshold(keys, timeLimit, threshold);
thresholdHit.Subscribe(count => Console.WriteLine("THRESHOLD BREACHED! Count is: {0}", count));
// block the main thread so we don't terminate
keys.Where(key => key.Key == ConsoleKey.Escape).FirstAsync().Wait();
Console.WriteLine("Finished");
}
(如果我应该把它放在要点或粘贴文件中,而不是放在问题中,请直接说出来)
现在这似乎做了我想要的,如果我在5秒内按任何键3次或更多次,"阈值突破!!"打印一次,没有更多的事情发生。
我的问题是:
- TimeLimitedThreshold函数是否在一个合理的抽象级别上与Rx.Net一起工作?
- 我应该注入调度使用时创建Observable.Timer()?
- 如果有的话,我错过了什么清理?涉及到Rx时的内存使用情况。. NET真的让我很困惑。
我可能会这样写阈值函数,利用Timestamp
组合子。
public static IObservable<U> TimeLimitedThreshold
<T,U>
( this IObservable<T> source
, int count
, TimeSpan timeSpan
, Func<IList<T>,U> selector
, IScheduler scheduler = null
)
{
var tmp = scheduler == null
? source.Timestamp()
: source.Timestamp(scheduler);
return tmp
.Buffer(count, 1).Where(b=>b.Count==count)
.Select(b => new { b, span = b.Last().Timestamp - b.First().Timestamp })
.Where(o => o.span <= timeSpan)
.Select(o => selector(o.b.Select(ts=>ts.Value).ToList()));
}
作为一个额外的方便,当触发器被触发时,满足触发器的完整缓冲区被提供给你的选择器函数。
例如 var keys = KeyPresses().ToObservable(Scheduler.Default).Publish().RefCount();
IObservable<string> fastKeySequences = keys.TimeLimitedThreshHold
( 3
, TimeSpan.FromSeconds(5)
, keys => String.Join("", keys)
);
给出额外的IScheduler
参数,因为Timestamp
方法有一个额外的过载,取1。这可能是有用的,如果你想有一个自定义调度程序,不跟踪时间根据内部时钟。对于测试目的,使用历史调度程序可能是有用的,然后您将需要额外的过载。
,这里是一个完全工作的测试,显示了一个时间表的使用。(使用XUnit和FluentAssertions为Should().Be(…))
public class TimeLimitedThresholdSpec : ReactiveTest
{
TestScheduler _Scheduler = new TestScheduler();
[Fact]
public void ShouldWork()
{
var o = _Scheduler.CreateColdObservable
( OnNext(100, "A")
, OnNext(200, "B")
, OnNext(250, "C")
, OnNext(255, "D")
, OnNext(258, "E")
, OnNext(600, "F")
);
var fixture = o
.TimeLimitedThreshold
(3
, TimeSpan.FromTicks(20)
, b => String.Join("", b)
, _Scheduler
);
var actual = _Scheduler
.Start(()=>fixture, created:0, subscribed:1, disposed:1000);
actual.Messages.Count.Should().Be(1);
actual.Messages[0].Value.Value.Should().Be("CDE");
}
}
订阅and的方式如下
IDisposable subscription = fastKeySequences.Subscribe(s=>Console.WriteLine(s));
,当您想要取消订阅(清理内存和资源)时,您将处理订阅。简单。
subscription.Dispose()
这里有一种替代方法,它使用单个延迟而不是缓冲区和计时器。它不给你事件——它只是在有冲突的时候发出信号——但是它使用更少的内存,因为它不会占用太多的内存。
public static class ObservableExtensions
{
public static IObservable<Unit> TimeLimitedThreshold<TSource>(
this IObservable<TSource> source,
long threshold,
TimeSpan timeLimit,
IScheduler s)
{
var events = source.Publish().RefCount();
var count = events.Select(_ => 1)
.Merge(events.Select(_ => -1)
.Delay(timeLimit, s));
return count.Scan((x,y) => x + y)
.Where(c => c == threshold)
.Select(_ => Unit.Default);
}
}
Publish().RefCount()
用于避免订阅多个源。查询将所有事件投射到1
,并将延迟的事件流投射到-1
,然后生成运行总数。如果运行总数达到阈值,我们发出一个信号(Unit.Default
是表示没有有效负载的事件的Rx类型)。下面是一个测试(仅在LINQPad中使用nuget rx-testing
运行):
void Main()
{
var s = new TestScheduler();
var source = s.CreateColdObservable(
new Recorded<Notification<int>>(100, Notification.CreateOnNext(1)),
new Recorded<Notification<int>>(200, Notification.CreateOnNext(2)),
new Recorded<Notification<int>>(300, Notification.CreateOnNext(3)),
new Recorded<Notification<int>>(330, Notification.CreateOnNext(4)));
var results = s.CreateObserver<Unit>();
source.TimeLimitedThreshold(
2,
TimeSpan.FromTicks(30),
s).Subscribe(results);
s.Start();
ReactiveAssert.AssertEqual(
results.Messages,
new List<Recorded<Notification<Unit>>> {
new Recorded<Notification<Unit>>(
330, Notification.CreateOnNext(Unit.Default))
});
}
编辑
在Matthew Finlay观察到当阈值"在下降的过程中"通过时,上面的代码也会触发,我添加了这个版本,只检查阈值在正方向上的跨越:
public static class ObservableExtensions
{
public static IObservable<Unit> TimeLimitedThreshold<TSource>(
this IObservable<TSource> source,
long threshold,
TimeSpan timeLimit,
IScheduler s)
{
var events = source.Publish().RefCount();
var count = events.Select(_ => 1)
.Merge(events.Select(_ => -1)
.Delay(timeLimit, s));
return count.Scan((x,y) => x + y)
.Scan(new { Current = 0, Last = 0},
(x,y) => new { Current = y, Last = x.Current })
.Where(c => c.Current == threshold && c.Last < threshold)
.Select(_ => Unit.Default);
}
}