将多个可观察对象合并到一个可观察数组中



嗨,我正试图将一些可观察对象合并到一个可观察数组。这里有一个在fsi中工作的例子。(很抱歉很长)

#r "./bin/Debug/System.Reactive.dll"
open System
open System.Reactive.Linq
/// Subscribes to the Observable with all 3 callbacks.
let subscribeComplete next error completed (observable: IObservable<'T>) = 
    observable.Subscribe(
        (fun x -> next x),
        (fun e -> error e),
        (fun () -> completed()))
/// Subscribes to the Observable with a next and an error-function.
let subscribeWithError next error observable = 
    subscribeComplete next error (fun () -> ()) observable
/// Subscribes to the Observable with a next-function
let subscribe (next: 'T -> unit) (observable: IObservable<'T>) : IDisposable = 
    subscribeWithError next ignore observable
/// Static method to generate observable from input functions
let ObsGenerate (initState: 'TS) (termCond: 'TS -> bool) (iterStep: 'TS -> 'TS)
        (resSelect: 'TS -> 'TR) (timeSelect : 'TS -> System.TimeSpan) =
            Observable.Generate(initState, termCond, iterStep, resSelect, timeSelect)
//maps the given observable with the given function
let obsMap (f: 'T -> 'U) (observable : IObservable<'T>) : IObservable<'U> =
    Observable.Select(observable, Func<_,_>(f))
/// Merges two observable sequences into one observable sequence whenever one of the observable sequences has a new value.
let combineLatest (obs1: IObservable<'T>) (obs2: IObservable<'U>) : IObservable<'T * 'U> = 
    Observable.CombineLatest(
        obs1, obs2, Func<_,_,_>(fun a b -> a, b))    
/// Merges three observable sequences into one observable sequence whenever one of the observable sequences has a new value.
let combineLatest3 (obs1: IObservable<'T>) (obs2: IObservable<'U>) (obs3: IObservable<'V>) : IObservable<'T * 'U * 'V> = 
    let obs12 =obs1.CombineLatest(obs2, Func<_,_,_>(fun a b -> a, b))    
    obs12.CombineLatest(obs3, Func<_,_,_>(fun (a,b) c -> a, b, c))    
/// Merges four observable sequences into one observable sequence whenever one of the observable sequences has a new value.
let combineLatest4 (obs1: IObservable<'T>) (obs2: IObservable<'U>) (obs3: IObservable<'V>) (obs4: IObservable<'W>) : IObservable<'T * 'U * 'V * 'W> = 
    let obsNew = combineLatest3 obs1 obs2 obs3
    obsNew.CombineLatest(obs4, Func<_,_,_>(fun (a,b,c) d -> a, b, c, d))    
// second section generating arrays
let combineLatestArray (obs1: IObservable<'T>) (obs2: IObservable<'T>) =
    combineLatest obs1 obs2 
    |> obsMap (fun (a, b) -> [a; b] |> List.toArray)
let combineLatest3Array (obs1: IObservable<'T>) (obs2: IObservable<'T>) (obs3: IObservable<'T>) =
    combineLatest3 obs1 obs2 obs3 
    |> obsMap (fun (a, b, c) -> [a; b; c] |> List.toArray)
let combineLatest4Array (obs1: IObservable<'T>) (obs2: IObservable<'T>) (obs3: IObservable<'T>) (obs4: IObservable<'T>) =
    combineLatest4 obs1 obs2 obs3 obs4
    |> obsMap (fun (a, b, c, d) -> [a; b; c; d] |> List.toArray)
let combineLatestListToArray (list: IObservable<'T> List) =
    match list.Length with
    | 2 -> combineLatestArray list.[0] list.[1]
    | 3 -> combineLatest3Array list.[0] list.[1] list.[2]
    | 4 -> combineLatest4Array list.[0] list.[1] list.[2] list.[3]
    | _ -> failwith "combine latest on unsupported list size"
type FooType = 
        {   NameVal :   string
            IdVal   :   int
            RetVal  :   float }
        member x.StringKey() =
            x.NameVal.ToString() + ";" + x.IdVal.ToString()

// example code starts here
let rnd = System.Random()
let fooListeners = Collections.Generic.Dictionary()
let AddAFoo (foo : FooType) =
    let fooId = foo.StringKey()
    if fooListeners.ContainsKey(fooId)
        then fooListeners.[fooId]
    else
        let myObs = ObsGenerate {NameVal = foo.NameVal; IdVal = foo.IdVal; RetVal = foo.RetVal} (fun x -> true) (fun x -> {NameVal = (x.NameVal); IdVal = (x.IdVal); RetVal = (x.RetVal + rnd.NextDouble() - 0.5)}) (fun x -> x) (fun x -> System.TimeSpan.FromMilliseconds(rnd.NextDouble() * 2000.0))
        fooListeners.Add(fooId,myObs)
        myObs
let fooInit =   [6..9]
                |> List.map (fun index -> {NameVal = (string index + "st"); IdVal = index; RetVal = (float index + 1.0)})     
                |> List.map (fun foo -> AddAFoo foo)
let fooValuesArray =    fooInit
                        |> List.map(fun x -> (x |> obsMap (fun x -> x.RetVal)))
                        |> combineLatestListToArray
let mySub =
    fooValuesArray
    |> subscribe (fun fooVals -> printfn "fooArray: %A" fooVals)
//execute until here to start example
// execute this last line to unsubscribe
mySub.Dispose()
关于这段代码,我有两个问题:
  1. 是否有更聪明的方法将可观察对象合并到数组中?(它变得非常长,因为我需要合并更大的数组)

  2. 我想节流更新。我的意思是,我希望在(比如说)相同的半秒窗口内发生的所有更新都作为数组上的一次更新来处理。理想情况下,我希望这个窗口只在第一个更新进来时打开,即如果2秒内没有更新到达,那么一个更新到达,然后我们等待并包含0.5秒的进一步更新,然后触发可观察对象。我不希望它每隔0.5秒周期性发布一次,尽管没有触发可观察对象。我希望这个描述足够清楚。

更新:我已经决定接受其中一个f#的答案,但是我还没有完成c#的答案。我希望能尽快检查它们。

对于1,List.foldList.toArray以及一些Observable算子的应用应该可以很好地工作。比如:

let combineLatest observables =
    Observable.Select(
        (observables 
         |> List.fold (fun ol o 
                         -> Observable.CombineLatest(o, ol, (fun t tl -> t :: tl))
                      ) (Observable.Return<_>([]))
        ), 
        List.toArray)

由于嵌套,如果你有一个大的可观察对象列表,你的可能会以性能问题告终,但在你诉诸于手工编写之前,至少值得尝试一下。

对于2,我同意对结果应用Throttling的其他答案。

我很抱歉这不是f# -我希望我有时间学习它-但这里有一个可能的答案在c#

这里有一组扩展方法,可以将最新的IEnumerable<IObservable<T>>扩展到IObservable<IEnumerable<T>>:

public static IObservable<IEnumerable<T>> CombineLatest<T>(this IObservable<T> first, IObservable<T> second)
{
    if (first == null) throw new ArgumentNullException("first");
    if (second == null) throw new ArgumentNullException("second");
    return first.CombineLatest(second, (t0, t1) => EnumerableEx.Return(t0).Concat(EnumerableEx.Return(t1)));
}
public static IObservable<IEnumerable<T>> CombineLatest<T>(this IObservable<IEnumerable<T>> firsts, IObservable<T> second)
{
    if (firsts == null) throw new ArgumentNullException("firsts");
    if (second == null) throw new ArgumentNullException("second");
    return firsts.CombineLatest(second, (t0s, t1) => t0s.Concat(EnumerableEx.Return(t1)));
}
public static IObservable<IEnumerable<T>> CombineLatest<T>(this IEnumerable<IObservable<T>> sources)
{
    if (sources == null) throw new ArgumentNullException("sources");
    return sources.CombineLatest(() => sources.First().CombineLatest(sources.Skip(1)), () => Observable.Empty<IEnumerable<T>>());
}
public static IObservable<IEnumerable<T>> CombineLatest<T>(this IObservable<T> first, IEnumerable<IObservable<T>> seconds)
{
    if (first == null) throw new ArgumentNullException("first");
    if (seconds == null) throw new ArgumentNullException("seconds");
    return seconds.CombineLatest(() => first.CombineLatest(seconds.First()).CombineLatest(seconds.Skip(1)), () => first.Select(t => EnumerableEx.Return(t)));
}
public static IObservable<IEnumerable<T>> CombineLatest<T>(this IObservable<IEnumerable<T>> firsts, IEnumerable<IObservable<T>> seconds)
{
    if (firsts == null) throw new ArgumentNullException("firsts");
    if (seconds == null) throw new ArgumentNullException("seconds");
    return seconds.CombineLatest(() => firsts.CombineLatest(seconds.First()).CombineLatest(seconds.Skip(1)), () => firsts);
}
private static IObservable<IEnumerable<T>> CombineLatest<T>(this IEnumerable<IObservable<T>> sources, Func<IObservable<IEnumerable<T>>> any, Func<IObservable<IEnumerable<T>>> none)
{
    if (sources == null) throw new ArgumentNullException("sources");
    if (any == null) throw new ArgumentNullException("any");
    if (none == null) throw new ArgumentNullException("none");
    return Observable.Defer(() => sources.Any() ? any() : none());
}

它们可能不是很有效,但它们确实可以处理需要组合的任意数量的可观察对象。

我很想看到这些方法转换成f#。

关于你的第二个问题,我不确定我能回答你到目前为止所说的,因为CombineLatestThrottle都失去了值,所以在尝试回答之前更详细地了解你的用例可能是谨慎的。

尽管Gideon Engelberth已经用一种可能的解决方法回答了你的问题。其他可能的方式可能像下面这样,它不使用嵌套。

let combineLatestToArray (list : IObservable<'T> list) = 
    let s = new Subject<'T array>()
    let arr = Array.init list.Length (fun _ -> Unchecked.defaultof<'T>)
    let cb (i:int,v:'T) = 
        arr.[i] <- v
        s.OnNext(arr |> Array.toList |> List.toArray)
    let main = list |> List.mapi (fun i o -> o.Select(fun t -> (i,t)))
               |> Observable.Merge
    main.Subscribe(new Action<int * 'T>(cb)
                   ,new Action<exn>(fun e -> s.OnError(e)) 
                   ,new Action(fun () -> s.OnCompleted()) ) |> ignore
    s :> IObservable<'T array>

让我知道这是否解决了你的问题,因为我还没有测试它:)注意:这是第一部分,对于第二部分,每个人都已经提到了你需要做什么

更新:另一个实现:

let combineLatestToArray (list : IObservable<'T> list) = 
    let s = new Subject<'T array>()
    let arr = Array.init list.Length (fun _ -> Unchecked.defaultof<'T>)
    let main = list |> List.mapi (fun i o -> o.Select(fun t -> (i,t)))
               |> Observable.Merge
    async {
        try
            let se = main.ToEnumerable() |> Seq.scan (fun ar (i,t) -> Array.set ar i t; ar) arr
            for i in se do
                s.OnNext(i |> Array.toList |> List.toArray)
            s.OnCompleted()
        with
        | :? Exception as ex -> s.OnError(ex)
    } |> Async.Start
    s :> IObservable<'T array>
  1. 似乎Observable.Merge()IObservables的可变数量有过载更接近你想要的。

  2. Observable.Buffer()与时间过载将在这里做你想要的。在"no events"的情况下,Buffer仍然会OnNext()一个空列表,让您对这种情况作出反应。

这是我能想到的最好的。我一直想解决这个问题。

public static class Extensions
{
    public static IObservable<IEnumerable<T>> CombineLatest<T>(this Observable observable, IEnumerable<IObservable<T>> observableCollection)
    {
        return observableCollection.CombineLatest();
    }
    public static IObservable<IEnumerable<T>> CombineLatest<T>(this IEnumerable<IObservable<T>> observables)
    {
        return observables.Aggregate<IObservable<T>, IObservable<IEnumerable<T>>>
        (
            Observable.Return(Enumerable.Empty<T>()),
            (o, n) => o.CombineLatest
            (
                n,
                (list, t) => list.Concat(EnumerableEx.Return(t))
            )
        );
    }
}

一个例子是:

var obs = new List<IObservable<bool>> 
{ 
    Observable.Return(true), 
    Observable.Return(false), 
    Observable.Return(true) 
};
var result = obs.CombineLatest().Select(list => list.All(x => x));
result.Subscribe(Console.WriteLine);
Console.ReadKey();
但是,您必须知道,在所有可观察对象都产生值之前,生成的IObservable<IEnumerable<T>>不会触发。这是我在我的场景中需要的,但可能不适合您的场景。

我担心的是所有. concats的性能。在扩展方法中处理可变集合可能会更高效。不确定。

对不起,我不知道f#。总有一天我会抽出时间的。

节流是在你得到最终的可观察对象之后用.Throttle操作符完成的。

编辑:这个答案是Enigmativity的递归Yang的迭代Ying

相关内容

  • 没有找到相关文章

最新更新