Reactive Extensions具有扩展方法:
Join(this IObservable<TLeft> left, IObservable<TRight> right,
Func<TLeft, IObservable<TLeftDuration>> leftDurationSelector,
Func<TRight, IObservable<TRightDuration>> rightDurationSelector,
Func<TLeft, TRight, TResult> resultSelector)
但是,不存在也将Func<TLeft, TRight, bool>
作为联接条件的重载。因此,此方法返回左和右的完整笛卡尔乘积。
我已经将其实现为
public static IObservable<TResult> Join<TLeft, TRight, TResult>(
this IObservable<TLeft> left, IObservable<TRight> right, Func<TLeft, TRight, bool> joinCondition,
Func<TLeft, TRight, TResult> resultSelector)
{
return left.Join(right, l => Observable.Never<TLeft>(), r => Observable.Never<TRight>(),
(l, r) => new Tuple<TLeft, TRight>(l, r))
.Where(CurryTuple(joinCondition))
.Select(CurryTuple(resultSelector));
}
private static Func<Tuple<TLeft, TRight>, TResult> CurryTuple<TLeft, TRight, TResult>(Func<TLeft, TRight, TResult> func)
{
return tuple => func(tuple.Item1, tuple.Item2);
}
随着我越来越多地使用它,我想知道我是否需要更多类似的方法(也许是外部联接——无论这对Observables意味着什么)。与其试图实现所有这些(并带来潜在的错误和次优代码),我想知道是否有其他人也遇到过同样的问题,以及是否有已经编写好的库。
有什么想法吗?
我认为您误解了反应联接的工作方式。联接条件是事件重合,而不是您提供的二进制运算。基本上,每个事件都有一个持续时间,同时存在的左源和右源的事件被连接起来。请看此视频了解更多解释。
如果持续时间是非终止的,则反应联接的结果是笛卡尔乘积——正如您在样本中所做的那样。你所做的事情会带来一些非常糟糕的性能特征。
你能提供大理石图、单元测试和/或你试图实现的示例场景吗?
我怀疑不使用Join会有更好的方法。