反应扩展将两个IOobservable合并为一个



我试图将包含请求信息的IObservable与包含响应信息的IObservable结合起来。请求和响应信息都包含一个Id和一个DateTime形式的时间戳。请求上的Id将与响应上的Id匹配。

我需要根据Id组合这两个流,并计算请求的时间戳和响应的时间戳之间的时间。实现这一目标的最佳方法是什么:

您可以巧合地加入。

from request in requests             // 1
join response in responses           // 2
on responses.Where(response => response.Id == request.Id) // 1,3
equals Observable.Empty<Unit>()      // 2
where request.Id == response.Id      // 4
select response.Time - request.Time  // 5

这样想:

  1. 为每个请求创建一个窗口
  2. 将每个响应投影到每个请求窗口中
  3. 当响应与请求匹配时,关闭该请求窗口
  4. 对于每个请求,忽略所有不匹配的响应
  5. 为匹配的响应投影时间差

更新:

请求窗口由from打开,由on关闭。

响应窗口由join打开,由equals关闭。equals指定的响应持续时间是Empty,因此响应实际上不是窗口——它们被视为点事件,只需一次投影到当前打开的任何请求窗口中。

#3需要CCD_ 7运算符,因为它定义了每个请求的持续时间;即,它指示请求窗口何时关闭,即具有匹配ID的响应何时到达。

#因为#3,所以需要4。想想看,我们正在将每个响应与当前打开的每个请求窗口交叉连接。只有当匹配的响应到达时,请求窗口才会关闭。这意味着每个不匹配的响应都会与每个请求配对,但你只关心最后一个响应——匹配的那个。

如果您没有并发请求,Zip可能是您要寻找的操作员:

http://rxmarbles.com/#zip

Zip将在一个序列中组合两个可观测值,该序列以被组合的最慢可观测值定义的速率产生元素。

如果您有并发请求,您可能应该尝试一种不同的方法,这种方法不需要将这两个可观测值组合起来。例如,您可以将请求ID和DateTime存储在ConcurrentDictionary中,并订阅可观察到的响应,在那里您可以从ConcurrentDiction中查找相应的请求(阅读后不要忘记删除条目)。

相关内容

  • 没有找到相关文章

最新更新