响应式扩展观察者的问题



我正在开发一个使用响应式扩展的应用程序,遇到了以下问题:

假设我有两个观察者p和Q,我想建立第三个观察者R,如果两个p值没有Q, R输出0。如果P之后是Q,则R输出传递这些值的方法的结果,例如:

P0    Q0    ->    R0 = f(P0,Q0)    
P1          ->    R1 = 0    
P2    Q1    ->    R2 = f(P2,Q1)    
P3          ->    R3 = 0    
P4          ->    R4 = 0    
P5    Q2    ->    R5 = f(P5,Q2)
(...)

和值按以下顺序进入观察器:

P0 Q0 P1 P2 Q1 P3 P4 P5 Q2

谢谢你的帮助。

我想我有一个解决办法。

如果我假设你有以下定义:

IObservable<int> ps = ...;
IObservable<int> qs = ...;
Func<int, int, int> f = ...;

首先,我创建一个函数字典来计算最终值:

var fs = new Dictionary<string, Func<int, int, int?>>()
{
    { "pp", (x, y) => 0 },
    { "pq", (x, y) => f(x, y) },
    { "qp", (x, y) => null },
    { "qq", (x, y) => null },
};

"p" &"q"在那里

然后你可以创建一个合并的观察对象,像这样:

var pqs =
    (from p in ps select new { k = "p", v = p })
        .Merge(from q in qs select new { k = "q", v = q });

我现在知道哪个序列产生了哪个值。

接下来,我发布合并列表,因为我不知道源可观察对象是热的还是冷的——所以发布它们会使它们成为热的——然后我将发布的可观察对象压缩到自己,分别跳过1和0。然后我知道每一对值和它们来自的原始可观测值。这样就很容易应用字典函数(过滤掉任何空值)。

在这里:

var rs =
    from kvv in pqs.Publish(_pqs =>
        _pqs.Skip(1).Zip(_pqs, (pq1, pq0) => new
        {
            k = pq0.k + pq1.k,
            v1 = pq1.v,
            v0 = pq0.v
        }))
    let r = fs[kvv.k](kvv.v0, kvv.v1)
    where r.HasValue
    select r.Value;

这对你有用吗?

总体思路很简单:您合并p和Q,使用BufferWithCount(2)获得值对,然后根据您的逻辑处理对:

<>之前 P.Merge(Q).BufferWithCount(2).Select(values => { var first = values[0]; var second = values[1]; if (first is P && second is P || first is Q && second is Q) { return 0; } if (first is P) { return selector(first, second); } else // suppose Q, P is a valid sequence as well. { return selector(second, first); } }); 之前

现在困难的部分是合并p和Q,如果它们是不同的类型,然后在Select中区分它们。如果它们是相同的类型,你可以使用一些简单的方法,比如Enigmativity提出的方法,即

<>之前 var pqs = (from p in ps select new { k = "p", v = p }) .Merge(from q in qs select new { k = "q", v = q }); 之前

现在困难的部分是,如果它们是不同的类型,要合并它们,我们需要一些通用的包装类型,例如,Data。从Haskell:

<>之前 public abstract class Either<TLeft, TRight> { private Either() { } public static Either<TLeft, TRight> Create(TLeft value) { return new Left(value); } public static Either<TLeft, TRight> Create(TRight value) { return new Right(value); } public abstract TResult Match<TResult>( Func<TLeft, TResult> onLeft, Func<TRight, TResult> onRight); public sealed class Left : Either<TLeft, TRight> { public Left(TLeft value) { this.Value = value; } public TLeft Value { get; private set; } public override TResult Match<TResult>( Func<TLeft, TResult> onLeft, Func<TRight, TResult> onRight) { return onLeft(this.Value); } } public sealed class Right : Either<TLeft, TRight> { public Right(TRight value) { this.Value = value; } public TRight Value { get; private set; } public override TResult Match<TResult>( Func<TLeft, TResult> onLeft, Func<TRight, TResult> onRight) { return onRight(this.Value); } } } 之前有趣的是,在system . react .dll中已经有类似的类了,不幸的是它是内部的,所以我们需要自己的实现。现在我们可以将P和Q都放入任意一个,并继续求解(我已经将其一般化了一点,因此您可以返回任何结果,而不是只返回int):<>之前 public static IObservable<TResult> SmartZip<TLeft, TRight, TResult>( IObservable<TLeft> leftSource, IObservable<TRight> rightSource, Func<TLeft, TRight, TResult> selector) { return Observable .Merge( leftSource.Select(Either<TLeft, TRight>.Create), rightSource.Select(Either<TLeft, TRight>.Create)) .BufferWithCount(2) .Select(values => { // this case was not covered in your question, // but I've added it for the sake of completeness. if (values.Count < 2) { return default(TResult); } var first = values[0]; var second = values[1]; // pattern-matching in C# is really ugly. return first.Match( left => second.Match( _ => default(TResult), right => selector(left, right)), right => second.Match( left => selector(left, right), _ => default(TResult))); }); } 之前

这里是所有这些可怕的丑陋的东西的一个小演示。

<>之前 private static void Main(string[] args) { var psource = Observable .Generate(1, i => i < 100, i => i, i => i + 1) .Zip(Observable.Interval(TimeSpan.FromMilliseconds(10.0)), (i, _) => i); var qsource = Observable .Generate(1, i => i < 100, i => (double)i * i, i => i + 1) .Zip(Observable.Interval(TimeSpan.FromMilliseconds(30.0)), (i, _) => i); var result = SmartZip( psource, qsource, (p, q) => q / p).ToEnumerable(); foreach (var item in result) { Console.WriteLine(item); } }

如果我正确理解了你的问题,那么下面是一个可以处理这种情况的通用函数:

public static IObservable<T> MyCombiner<T>(IObservable<T> P, IObservable<T> Q, T defaultValue,Func<T,T,T> fun)
        {
            var c = P.Select(p => new { Type = 'P', Value = p })
                        .Merge(Q.Select(p => new { Type = 'Q', Value = p }));
            return c.Zip(c.Skip(1), (a, b) =>
            {
                if (a.Type == 'P' && b.Type == 'P')
                    return new { Ok = true, Value = defaultValue };
                if (a.Type == 'P' && b.Type == 'Q')
                    return new { Ok = true, Value = fun(a.Value, b.Value) };
                else
                    return new { Ok = false, Value = default(T) };
            }).Where(b => b.Ok).Select(b => b.Value);
        }

假设我们有两个方法

  1. Before,每当第一个可观察对象在第二个可观察对象之前产生元素时,通过使用选择器函数将两个可观察序列合并为一个可观察序列。
  2. 不包含,每当第一个可观察对象出现两个项目时,将一个可观察序列合并到另一个可观察序列中,而第二个可观察对象没有任何项目。

用这种方法问题几乎解决了。

IObservable<TP> P = // observer P
IObservable<TQ> Q = // observer Q
var PP = P.Without((prev, next) => 0, Q);
var PQ = P.Before(Q, (p,q) => f(p,q)); // apply the function
var ResultSecuence = PP.Merge(PQ);

这里有两个方法

public static class Observer
{
    /// <summary>
    /// Merges two observable sequences into one observable sequence by using the selector function 
    /// whenever the first observable produces an element rigth before the second one.
    /// </summary>
    /// <param name="first"> First observable source.</param>
    /// <param name="second">Second observable source.</param>
    /// <param name="resultSelector">Function to invoke whenever the first observable produces an element rigth before the second one.</param>
    /// <returns>
    /// An observable sequence containing the result of combining elements of both sources 
    /// using the specified result selector function.
    /// </returns>
    public static IObservable<TResult> Before<TLeft, TRight, TResult>(this IObservable<TLeft> first, IObservable<TRight> second, Func<TLeft, TRight, TResult> resultSelector)
    {
        var result = new Subject<TResult>();
        bool firstCame = false;
        TLeft lastLeft = default(TLeft);
        first.Subscribe(item =>
        {
            firstCame = true;
            lastLeft = item;
        });
        second.Subscribe(item =>
        {
            if (firstCame)
                result.OnNext(resultSelector(lastLeft, item));
            firstCame = false;
        });
        return result;
    }
    /// <summary>
    /// Merges an observable sequence into one observable sequence by using the selector function 
    /// every time two items came from <paramref name="first"/> without any item of any observable
    /// in <paramref name="second"/>
    /// </summary>
    /// <param name="first"> Observable source to merge.</param>
    /// <param name="second"> Observable list to ignore.</param>
    /// <param name="resultSelector">Function to invoke whenever the first observable produces two elements without any of the observables in the secuence produces any element</param>
    /// <returns>
    /// An observable sequence containing the result of combining elements
    /// using the specified result selector function.
    /// </returns>
    public static IObservable<TResult> Without<TLeft, TResult>(this IObservable<TLeft> first,  Func<TLeft, TLeft, TResult> resultSelector,params IObservable<object>[] second)
    {
        var result = new Subject<TResult>();
        bool firstCame = false;
        TLeft lastLeft = default(TLeft);
        first.Subscribe(item =>
        {
            if (firstCame)
                result.OnNext(resultSelector(lastLeft, item));
            firstCame = true;
            lastLeft = item;
        });
        foreach (var observable in second)
            observable.Subscribe(item => firstCame = false);
        return result;
    }        
}

最新更新