Rx.Net 组连接两个具有连接条件的时间的可观察量



给定 2 个热门可观察量 t1 和 t2,我将如何 GoupJoin,以便我从 t2 中获取在 t1 中每个事件之前 x 秒和之后 y 秒发生的所有事件?

鉴于:

t1 -----A-----B-----C

T2 --1--2--3--4--5--6

如果 t1 相隔 2 秒,t2相隔 1 秒,并且我们正在寻找每个 t1 事件两侧各 1 秒的 t2 事件,则结果如下。

结果:

{ A, [1,2,3] }

{ B, [3,4,5] }

{ C, [5,6] }

以下是真实示例,我们需要解决上述问题。 我们有一连串电子邮件和另一股短信。我们需要发出另一个流结果,该结果具有电子邮件,并且短信发生在电子邮件发送时间的 1 分钟之前或之后。

这里的问题(正如 Shlomo 提到的)是我们需要在t1事件发生之前打开t2中的窗口。不幸的是,这是不可能的,因为一旦我们到达t1的事件,我们已经超过了我们需要打开窗口的点t2.

相反,我们可以做的是使用 Delay() 及时t2向前移动。如果我们将其偏移x(之前的时间),我们可以将问题重新构建为"获取在t1打开并在t1 + x + y关闭的窗口中发生的t2事件。我们可以使用 GroupJoin 来解决这个问题。

var scheduler = new HistoricalScheduler();
var t1 = Observable.Interval(TimeSpan.FromMilliseconds(200), scheduler)
.Select(l => (char)('A' + l));
var t2 = Observable.Interval(TimeSpan.FromMilliseconds(100), scheduler);
var x = TimeSpan.FromMilliseconds(100);     //before time
var y = TimeSpan.FromMilliseconds(100);     //after time
var delayedT2 = t2.Delay(x, scheduler);
var g = t1.GroupJoin(delayedT2 ,
_ => Observable.Timer(x + y, scheduler),
_ => Observable.Empty<Unit>(scheduler),
(a, b) => new { a, b}
);
scheduler.Start();

这给出了结果:

{ A, [1,2] }
{ B, [3,4] }
{ C, [5,6] }

这个结果仍然不是你所期望的。这是因为在您的示例中t2事件发生在与事件完全相同的时刻t1。在这种情况下,首先处理t1 + y事件,并在包含t2事件之前关闭窗口。 这意味着我们有效地获得了(t1-01:00) <= t1 < (t1 + 01:00)。例如,A的窗口是 01:0000 - 02.9999...这就是为什么不包括在03:00发生的3

这可以通过简单地在我们的y时间中添加一个即时报价来修复为包含

var y = TimeSpan.FromMilliseconds(100).Add(TimeSpan.FromTicks(1)); 

代码转储答案(使用 100 毫秒代替 1 秒):

var t1 = Observable.Interval(TimeSpan.FromMilliseconds(200))
.Select(l => (char)('A' + l))
.Delay(TimeSpan.FromMilliseconds(200));
var t2 = Observable.Interval(TimeSpan.FromMilliseconds(100))
.Delay(TimeSpan.FromMilliseconds(100));
var x = TimeSpan.FromMilliseconds(100);     //before time
var y = TimeSpan.FromMilliseconds(100);     //after time
var g = t1.Timestamp().Join(t2.Timestamp(),
c => Observable.Timer(y),
i => Observable.Timer(x + y),
(c, i) => new {GroupItem = c, RightItem = i}
)
.Where(a =>
(a.GroupItem.Timestamp > a.RightItem.Timestamp && a.GroupItem.Timestamp - a.RightItem.Timestamp <= x) //group-item came first
|| (a.GroupItem.Timestamp <= a.RightItem.Timestamp && a.RightItem.Timestamp - a.GroupItem.Timestamp <= y) // right-item came first, or exact timestamp match
)
.Select(a => new { GroupItem = a.GroupItem.Value, RightItem = a.RightItem.Value })
.GroupBy(a => a.GroupItem, a => a.RightItem);

说明:Join都是关于"窗户"的。因此,在定义联接时,必须考虑从左可观察和右可观察开始为每个项目打开的时间窗口。不过,我们这里的窗口很难弄清楚:我们必须以某种方式在它发生之前为左可观察的 X 时间打开一个窗口,然后在它发生后关闭它的 Y 时间。

而不是做不可能的事情,所以我们只在左项发生后打开 Y 时间,让右项窗口由 X + Y 时间定义。但是,这将给我们留下不应包含的项目。因此,我们在时间戳上使用Where来过滤掉它们。

最后,我们选择匿名类型和时间戳,并将其全部组合在一起。

我不认为GroupJoin是要走的路:你最终会拆散这个团体并重建它,就像我所做的一样。

相关内容

  • 没有找到相关文章

最新更新