给定 2 个热门可观察量 t1 和 t2,我将如何 GoupJoin,以便我从 t2 中获取在 t1 中每个事件之前 x 秒和之后 y 秒发生的所有事件?
鉴于:
t1 -----A-----B-----C
T2 --1--2--3--4--5--6
如果 t1 相隔 2 秒,t2相隔 1 秒,并且我们正在寻找每个 t1 事件两侧各 1 秒的 t2 事件,则结果如下。
结果:
{ A, [1,2,3] }
{ B, [3,4,5] }
{ C, [5,6] }
以下是真实示例,我们需要解决上述问题。 我们有一连串电子邮件和另一股短信。我们需要发出另一个流结果,该结果具有电子邮件,并且短信发生在电子邮件发送时间的 1 分钟之前或之后。
这里的问题(正如 Shlomo 提到的)是我们需要在t1
事件发生之前打开t2
中的窗口。不幸的是,这是不可能的,因为一旦我们到达t1
的事件,我们已经超过了我们需要打开窗口的点t2
.
相反,我们可以做的是使用 Delay() 及时t2
向前移动。如果我们将其偏移x
(之前的时间),我们可以将问题重新构建为"获取在t1
打开并在t1 + x + y
关闭的窗口中发生的t2
事件。我们可以使用 GroupJoin 来解决这个问题。
var scheduler = new HistoricalScheduler();
var t1 = Observable.Interval(TimeSpan.FromMilliseconds(200), scheduler)
.Select(l => (char)('A' + l));
var t2 = Observable.Interval(TimeSpan.FromMilliseconds(100), scheduler);
var x = TimeSpan.FromMilliseconds(100); //before time
var y = TimeSpan.FromMilliseconds(100); //after time
var delayedT2 = t2.Delay(x, scheduler);
var g = t1.GroupJoin(delayedT2 ,
_ => Observable.Timer(x + y, scheduler),
_ => Observable.Empty<Unit>(scheduler),
(a, b) => new { a, b}
);
scheduler.Start();
这给出了结果:
{ A, [1,2] }
{ B, [3,4] }
{ C, [5,6] }
这个结果仍然不是你所期望的。这是因为在您的示例中t2
事件发生在与事件完全相同的时刻t1
。在这种情况下,首先处理t1 + y
事件,并在包含t2
事件之前关闭窗口。 这意味着我们有效地获得了(t1-01:00) <= t1 < (t1 + 01:00)
。例如,A
的窗口是 01:0000 - 02.9999...这就是为什么不包括在03:00发生的3
。
这可以通过简单地在我们的y
时间中添加一个即时报价来修复为包含
var y = TimeSpan.FromMilliseconds(100).Add(TimeSpan.FromTicks(1));
代码转储答案(使用 100 毫秒代替 1 秒):
var t1 = Observable.Interval(TimeSpan.FromMilliseconds(200))
.Select(l => (char)('A' + l))
.Delay(TimeSpan.FromMilliseconds(200));
var t2 = Observable.Interval(TimeSpan.FromMilliseconds(100))
.Delay(TimeSpan.FromMilliseconds(100));
var x = TimeSpan.FromMilliseconds(100); //before time
var y = TimeSpan.FromMilliseconds(100); //after time
var g = t1.Timestamp().Join(t2.Timestamp(),
c => Observable.Timer(y),
i => Observable.Timer(x + y),
(c, i) => new {GroupItem = c, RightItem = i}
)
.Where(a =>
(a.GroupItem.Timestamp > a.RightItem.Timestamp && a.GroupItem.Timestamp - a.RightItem.Timestamp <= x) //group-item came first
|| (a.GroupItem.Timestamp <= a.RightItem.Timestamp && a.RightItem.Timestamp - a.GroupItem.Timestamp <= y) // right-item came first, or exact timestamp match
)
.Select(a => new { GroupItem = a.GroupItem.Value, RightItem = a.RightItem.Value })
.GroupBy(a => a.GroupItem, a => a.RightItem);
说明:Join
都是关于"窗户"的。因此,在定义联接时,必须考虑从左可观察和右可观察开始为每个项目打开的时间窗口。不过,我们这里的窗口很难弄清楚:我们必须以某种方式在它发生之前为左可观察的 X 时间打开一个窗口,然后在它发生后关闭它的 Y 时间。
而不是做不可能的事情,所以我们只在左项发生后打开 Y 时间,让右项窗口由 X + Y 时间定义。但是,这将给我们留下不应包含的项目。因此,我们在时间戳上使用Where
来过滤掉它们。
最后,我们选择匿名类型和时间戳,并将其全部组合在一起。
我不认为GroupJoin
是要走的路:你最终会拆散这个团体并重建它,就像我所做的一样。