我的目标是为"Type2"对象每5秒调用一个更新处理程序不超过一次。这个可观察对象每5秒会生成不止一个值,但是我想忽略在最后一次处理的更新后5秒内发生的所有值。
我在这里问了这个问题:仅在满足特定条件时节流
并得到了很好的反馈。它引导我使用Observable。窗口努力实现我的目标。我以为我有它的工作,但事实证明,它可以产生不正确的输出,如果第一次更新来之前,一个窗口关闭(所以更新处理),然后一旦下一个窗口打开,另一个更新到达,也处理,当我不希望它,因为它来在5秒内最后处理的更新。
下面是一些代码来演示这个问题,稍微修改了链接中的代码:
var source = new Subject<Thing>();
var feed = source.Publish().RefCount();
var ofType1 = feed.Where(t => t.ActivationType == "Type1");
var ofType2 = feed
.Where(t => t.ActivationType == "Type2")
.Window(() =>
Observable.Timer(TimeSpan.FromSeconds(5))
.Do(t => Console.WriteLine("nTICK: " + DateTime.Now.ToString("hh:mm:ss:fff"))))
.Select(x => x.Take(1))
.Merge()
.Do(t => Console.WriteLine("A new window opened " + DateTime.Now.ToString("hh:mm:ss:fff")));
var query = ofType1.Merge(ofType2);
query.Subscribe(t => Console.WriteLine("UPDATE: " + t.ID + " " + DateTime.Now.ToString("hh:mm:ss:fff")));
int msDelay = 3000;
Task task = Task.Factory
.StartNew(() => { Thread.Sleep(msDelay); })
.ContinueWith((Task starter) =>
{
while (running)
{
var thing = new Thing(); //Note that all Things are by default Type2
source.OnNext(thing);
Thread.Sleep(100);
}
}, TaskContinuationOptions.LongRunning);
Console.ReadLine();
所以,订阅完成了,一旦订阅完成,Observable。窗口中使用的计时器开始。用于生成值的while循环直到延迟3000 ms后才开始。
输出如下:
A new window opened 03:48:03:725
UPDATE: 1ac54fb3-f73d-4840-b4d8-95d4250ce65d 03:48:03:752
TICK: 03:48:05:714
A new window opened 03:48:05:754
UPDATE: 12d36e53-010f-4ccd-b9f8-2951b085f88c 03:48:05:754
TICK: 03:48:10:730
A new window opened 03:48:10:755
UPDATE: 25d84e72-94f9-4f50-83f4-14c1004c10fa 03:48:10:755
TICK: 03:48:15:738
A new window opened 03:48:15:755
UPDATE: 5f32b7d5-196f-445c-bf25-5c362b2fd6f0 03:48:15:755
TICK: 03:48:20:747
A new window opened 03:48:20:756
UPDATE: e3a3a30d-8031-41b5-b115-499dbe91aaf7 03:48:20:756
TICK: 03:48:25:755
A new window opened 03:48:25:756
UPDATE: 239fb25b-5135-463b-bf7e-5728ffa07f5c 03:48:25:756
如您所见,第一个Type2更新是在窗口打开时进入的,因此它得到了处理。然后,2秒后,窗口的计时器滴答作响,并打开一个新窗口。它立即处理下一个Type2更新,这是我不希望它做的。之后,它看起来正常工作(每5秒更新一次,在窗口声明中定义)。
是否有一种方法或另一种方法,我可以使用,以确保只有一个更新每5秒(或任何我选择的时间框架)被处理?
我想我有一个解决方案,但我能先提一些建议吗?我认为这个问题中有很多杂音,很难发现真正的问题是什么。
实际上你是在问"我如何才能得到一个值,然后至少沉默5秒"。Type1
代码是一种干扰。序列的生成也让人分心。
那么让我们清理示例代码,看看我们是否可以看到树木的木材:
首先,我不认为这是完全相关的类型是通过。在您的示例中,我们从不推入Type1
,所以我们只使用整数代替。这可能会使它更容易。
接下来,我们可以通过使用一个Observable来清理创建。定时器代替大循环+任务+线程。睡眠的东西。
现在我们有一个简单的起始位置:
var source = Observable.Timer(TimeSpan.FromSeconds(3),TimeSpan.FromMilliseconds(300), Scheduler.TaskPool);
var feed = source.Publish().RefCount();
所以我们的第一个问题是对你正在使用的窗口重载的误解。我想你的期望是,当第一个值被推入时,它将打开一个窗口。事实并非如此。计时器(即Observable.Timer(TimeSpan.FromSeconds(5)))在每次窗口打开时被订阅,这是最初的订阅发生时,然后在窗口本身关闭时再次订阅。因此,计时器立即启动,您只会在第一个窗口获得2秒的值。
接下来,我要画出我的问题空间。我最喜欢的方法是用大理石图表。它们在ASCII中翻译得不太好,但让我们试一试。
给定这个输入序列:
//Seconds 1111111111222222222233333333334444444444555555555566666666667777777777888888888899999999990000000000
//Tenths 01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890
//
//source : ------------------------------0--1--2--3--4--5--6--7--8--9--0--1--2--3--4--5--6--7--8--9--0--1--2--3--4--5--6--7--8--9--0
这应该表示3.0秒产生的值'0'。然后是3.3秒的"1"等等。
现在有了一点清晰的问题空间,我们可以画出我们认为窗口应该在哪里打开,应该在哪里关闭,以及下一个窗口应该在哪里打开。
让我们看看我们想要什么。
这里我们添加了一个Window1 (W1),当第一个值被按下('1' @3.0)时打开。5秒后关闭。在这个窗口中,我们希望在第一个值1之后保持沉默。
窗口2 (W2)应该在产生下一个值时打开,而不是在最后一个窗口关闭后立即打开(我认为?!)。在这里,我们看到当值'17'在8.4秒的时候被打开。
//Seconds 1111111111222222222233333333334444444444555555555566666666667777777777888888888899999999990000000000
//Tenths 01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890
//source : ------------------------------0--1--2--3--4--5--6--7--8--9--1--1--1--1--1--1--1--1--1--1--2--2--2--2--2--2--2--2--2--2--3--3--3--3--3--3--3--3--3--3--4--4--4--4-
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3
//
//W1 : 0-------------------------------------------------|
//W2 : (17)----------------------------------------------|
//W3 : (34)------------------------>
//expected: ------------------------------0--------------------------------------------------1--------------------------------------------------3---------------------------
7 4
现在我们知道了要查找的值,就可以构造查询了。
我想出了这个。我假设提要实际上是一个Hot序列。使用这个假设,我构造了一个重复的结构,它从提要中获取1个值,并将5秒的沉默连接到序列中。然后我添加Repeat操作符,它只会在5秒静默过后重新订阅提要。
public static IObservable<T> Silencer<T>(this IObservable<T> source, TimeSpan minSilencePeriod)
{
return source.Take(1)
.Concat(Observable.Empty<T>().Delay(minSilencePeriod))
.Repeat();
}
这将生成0、17、51等值。
现在将其应用到原始问题的代码中(清理一些东西)
void Main()
{
var source = Observable.Timer(TimeSpan.FromSeconds(3),TimeSpan.FromMilliseconds(300), Scheduler.TaskPool).Select(_=>new Thing());
var feed = source.Publish().RefCount();
var ofType1 = feed.Where(t => t.ActivationType == "Type1");
var ofType2 = feed
.Where(t => t.ActivationType == "Type2")
.Silencer(TimeSpan.FromSeconds(5));
var query = ofType1.Merge(ofType2);
var subscription = query.Subscribe(t => Console.WriteLine("UPDATE: " + t.ID + " " + DateTime.Now.ToString("hh:mm:ss:fff")));
Console.ReadLine();
subscription.Dispose();
}
我们看到至少间隔5秒的输出值
UPDATE: 3f0fc6f3-8a5a-476f-9661-b7330ab77877 09:14:04:725
UPDATE: fc8f0025-7a79-4329-8164-b8b421ad5865 09:14:09:817
UPDATE: ad739a71-885e-4d5b-a352-2302df0a4d87 09:14:14:925