我试图将包含请求信息的IObservable与包含响应信息的IObservable结合起来。请求和响应信息都包含一个Id和一个DateTime形式的时间戳。请求上的Id将与响应上的Id匹配。
我需要根据Id组合这两个流,并计算请求的时间戳和响应的时间戳之间的时间。实现这一目标的最佳方法是什么:
您可以巧合地加入。
from request in requests // 1
join response in responses // 2
on responses.Where(response => response.Id == request.Id) // 1,3
equals Observable.Empty<Unit>() // 2
where request.Id == response.Id // 4
select response.Time - request.Time // 5
这样想:
- 为每个请求创建一个窗口
- 将每个响应投影到每个请求窗口中
- 当响应与请求匹配时,关闭该请求窗口
- 对于每个请求,忽略所有不匹配的响应
- 为匹配的响应投影时间差
更新:
请求窗口由from
打开,由on
关闭。
响应窗口由join
打开,由equals
关闭。equals
指定的响应持续时间是Empty
,因此响应实际上不是窗口——它们被视为点事件,只需一次投影到当前打开的任何请求窗口中。
#3需要CCD_ 7运算符,因为它定义了每个请求的持续时间;即,它指示请求窗口何时关闭,即具有匹配ID的响应何时到达。
#因为#3,所以需要4。想想看,我们正在将每个响应与当前打开的每个请求窗口交叉连接。只有当匹配的响应到达时,请求窗口才会关闭。这意味着每个不匹配的响应都会与每个请求配对,但你只关心最后一个响应——匹配的那个。
如果您没有并发请求,Zip可能是您要寻找的操作员:
http://rxmarbles.com/#zip
Zip将在一个序列中组合两个可观测值,该序列以被组合的最慢可观测值定义的速率产生元素。
如果您有并发请求,您可能应该尝试一种不同的方法,这种方法不需要将这两个可观测值组合起来。例如,您可以将请求ID和DateTime存储在ConcurrentDictionary中,并订阅可观察到的响应,在那里您可以从ConcurrentDiction中查找相应的请求(阅读后不要忘记删除条目)。