嗨,我正试图将一些可观察对象合并到一个可观察数组。这里有一个在fsi中工作的例子。(很抱歉很长)
#r "./bin/Debug/System.Reactive.dll"
open System
open System.Reactive.Linq
/// Subscribes to the Observable with all 3 callbacks.
let subscribeComplete next error completed (observable: IObservable<'T>) =
observable.Subscribe(
(fun x -> next x),
(fun e -> error e),
(fun () -> completed()))
/// Subscribes to the Observable with a next and an error-function.
let subscribeWithError next error observable =
subscribeComplete next error (fun () -> ()) observable
/// Subscribes to the Observable with a next-function
let subscribe (next: 'T -> unit) (observable: IObservable<'T>) : IDisposable =
subscribeWithError next ignore observable
/// Static method to generate observable from input functions
let ObsGenerate (initState: 'TS) (termCond: 'TS -> bool) (iterStep: 'TS -> 'TS)
(resSelect: 'TS -> 'TR) (timeSelect : 'TS -> System.TimeSpan) =
Observable.Generate(initState, termCond, iterStep, resSelect, timeSelect)
//maps the given observable with the given function
let obsMap (f: 'T -> 'U) (observable : IObservable<'T>) : IObservable<'U> =
Observable.Select(observable, Func<_,_>(f))
/// Merges two observable sequences into one observable sequence whenever one of the observable sequences has a new value.
let combineLatest (obs1: IObservable<'T>) (obs2: IObservable<'U>) : IObservable<'T * 'U> =
Observable.CombineLatest(
obs1, obs2, Func<_,_,_>(fun a b -> a, b))
/// Merges three observable sequences into one observable sequence whenever one of the observable sequences has a new value.
let combineLatest3 (obs1: IObservable<'T>) (obs2: IObservable<'U>) (obs3: IObservable<'V>) : IObservable<'T * 'U * 'V> =
let obs12 =obs1.CombineLatest(obs2, Func<_,_,_>(fun a b -> a, b))
obs12.CombineLatest(obs3, Func<_,_,_>(fun (a,b) c -> a, b, c))
/// Merges four observable sequences into one observable sequence whenever one of the observable sequences has a new value.
let combineLatest4 (obs1: IObservable<'T>) (obs2: IObservable<'U>) (obs3: IObservable<'V>) (obs4: IObservable<'W>) : IObservable<'T * 'U * 'V * 'W> =
let obsNew = combineLatest3 obs1 obs2 obs3
obsNew.CombineLatest(obs4, Func<_,_,_>(fun (a,b,c) d -> a, b, c, d))
// second section generating arrays
let combineLatestArray (obs1: IObservable<'T>) (obs2: IObservable<'T>) =
combineLatest obs1 obs2
|> obsMap (fun (a, b) -> [a; b] |> List.toArray)
let combineLatest3Array (obs1: IObservable<'T>) (obs2: IObservable<'T>) (obs3: IObservable<'T>) =
combineLatest3 obs1 obs2 obs3
|> obsMap (fun (a, b, c) -> [a; b; c] |> List.toArray)
let combineLatest4Array (obs1: IObservable<'T>) (obs2: IObservable<'T>) (obs3: IObservable<'T>) (obs4: IObservable<'T>) =
combineLatest4 obs1 obs2 obs3 obs4
|> obsMap (fun (a, b, c, d) -> [a; b; c; d] |> List.toArray)
let combineLatestListToArray (list: IObservable<'T> List) =
match list.Length with
| 2 -> combineLatestArray list.[0] list.[1]
| 3 -> combineLatest3Array list.[0] list.[1] list.[2]
| 4 -> combineLatest4Array list.[0] list.[1] list.[2] list.[3]
| _ -> failwith "combine latest on unsupported list size"
type FooType =
{ NameVal : string
IdVal : int
RetVal : float }
member x.StringKey() =
x.NameVal.ToString() + ";" + x.IdVal.ToString()
// example code starts here
let rnd = System.Random()
let fooListeners = Collections.Generic.Dictionary()
let AddAFoo (foo : FooType) =
let fooId = foo.StringKey()
if fooListeners.ContainsKey(fooId)
then fooListeners.[fooId]
else
let myObs = ObsGenerate {NameVal = foo.NameVal; IdVal = foo.IdVal; RetVal = foo.RetVal} (fun x -> true) (fun x -> {NameVal = (x.NameVal); IdVal = (x.IdVal); RetVal = (x.RetVal + rnd.NextDouble() - 0.5)}) (fun x -> x) (fun x -> System.TimeSpan.FromMilliseconds(rnd.NextDouble() * 2000.0))
fooListeners.Add(fooId,myObs)
myObs
let fooInit = [6..9]
|> List.map (fun index -> {NameVal = (string index + "st"); IdVal = index; RetVal = (float index + 1.0)})
|> List.map (fun foo -> AddAFoo foo)
let fooValuesArray = fooInit
|> List.map(fun x -> (x |> obsMap (fun x -> x.RetVal)))
|> combineLatestListToArray
let mySub =
fooValuesArray
|> subscribe (fun fooVals -> printfn "fooArray: %A" fooVals)
//execute until here to start example
// execute this last line to unsubscribe
mySub.Dispose()
关于这段代码,我有两个问题:
是否有更聪明的方法将可观察对象合并到数组中?(它变得非常长,因为我需要合并更大的数组)
我想节流更新。我的意思是,我希望在(比如说)相同的半秒窗口内发生的所有更新都作为数组上的一次更新来处理。理想情况下,我希望这个窗口只在第一个更新进来时打开,即如果2秒内没有更新到达,那么一个更新到达,然后我们等待并包含0.5秒的进一步更新,然后触发可观察对象。我不希望它每隔0.5秒周期性发布一次,尽管没有触发可观察对象。我希望这个描述足够清楚。
更新:我已经决定接受其中一个f#的答案,但是我还没有完成c#的答案。我希望能尽快检查它们。
对于1,List.fold
和List.toArray
以及一些Observable
算子的应用应该可以很好地工作。比如:
let combineLatest observables =
Observable.Select(
(observables
|> List.fold (fun ol o
-> Observable.CombineLatest(o, ol, (fun t tl -> t :: tl))
) (Observable.Return<_>([]))
),
List.toArray)
由于嵌套,如果你有一个大的可观察对象列表,你的可能会以性能问题告终,但在你诉诸于手工编写之前,至少值得尝试一下。
对于2,我同意对结果应用Throttling的其他答案。
我很抱歉这不是f# -我希望我有时间学习它-但这里有一个可能的答案在c#
这里有一组扩展方法,可以将最新的IEnumerable<IObservable<T>>
扩展到IObservable<IEnumerable<T>>
:
public static IObservable<IEnumerable<T>> CombineLatest<T>(this IObservable<T> first, IObservable<T> second)
{
if (first == null) throw new ArgumentNullException("first");
if (second == null) throw new ArgumentNullException("second");
return first.CombineLatest(second, (t0, t1) => EnumerableEx.Return(t0).Concat(EnumerableEx.Return(t1)));
}
public static IObservable<IEnumerable<T>> CombineLatest<T>(this IObservable<IEnumerable<T>> firsts, IObservable<T> second)
{
if (firsts == null) throw new ArgumentNullException("firsts");
if (second == null) throw new ArgumentNullException("second");
return firsts.CombineLatest(second, (t0s, t1) => t0s.Concat(EnumerableEx.Return(t1)));
}
public static IObservable<IEnumerable<T>> CombineLatest<T>(this IEnumerable<IObservable<T>> sources)
{
if (sources == null) throw new ArgumentNullException("sources");
return sources.CombineLatest(() => sources.First().CombineLatest(sources.Skip(1)), () => Observable.Empty<IEnumerable<T>>());
}
public static IObservable<IEnumerable<T>> CombineLatest<T>(this IObservable<T> first, IEnumerable<IObservable<T>> seconds)
{
if (first == null) throw new ArgumentNullException("first");
if (seconds == null) throw new ArgumentNullException("seconds");
return seconds.CombineLatest(() => first.CombineLatest(seconds.First()).CombineLatest(seconds.Skip(1)), () => first.Select(t => EnumerableEx.Return(t)));
}
public static IObservable<IEnumerable<T>> CombineLatest<T>(this IObservable<IEnumerable<T>> firsts, IEnumerable<IObservable<T>> seconds)
{
if (firsts == null) throw new ArgumentNullException("firsts");
if (seconds == null) throw new ArgumentNullException("seconds");
return seconds.CombineLatest(() => firsts.CombineLatest(seconds.First()).CombineLatest(seconds.Skip(1)), () => firsts);
}
private static IObservable<IEnumerable<T>> CombineLatest<T>(this IEnumerable<IObservable<T>> sources, Func<IObservable<IEnumerable<T>>> any, Func<IObservable<IEnumerable<T>>> none)
{
if (sources == null) throw new ArgumentNullException("sources");
if (any == null) throw new ArgumentNullException("any");
if (none == null) throw new ArgumentNullException("none");
return Observable.Defer(() => sources.Any() ? any() : none());
}
它们可能不是很有效,但它们确实可以处理需要组合的任意数量的可观察对象。
我很想看到这些方法转换成f#。
关于你的第二个问题,我不确定我能回答你到目前为止所说的,因为CombineLatest
和Throttle
都失去了值,所以在尝试回答之前更详细地了解你的用例可能是谨慎的。
尽管Gideon Engelberth已经用一种可能的解决方法回答了你的问题。其他可能的方式可能像下面这样,它不使用嵌套。
let combineLatestToArray (list : IObservable<'T> list) =
let s = new Subject<'T array>()
let arr = Array.init list.Length (fun _ -> Unchecked.defaultof<'T>)
let cb (i:int,v:'T) =
arr.[i] <- v
s.OnNext(arr |> Array.toList |> List.toArray)
let main = list |> List.mapi (fun i o -> o.Select(fun t -> (i,t)))
|> Observable.Merge
main.Subscribe(new Action<int * 'T>(cb)
,new Action<exn>(fun e -> s.OnError(e))
,new Action(fun () -> s.OnCompleted()) ) |> ignore
s :> IObservable<'T array>
让我知道这是否解决了你的问题,因为我还没有测试它:)注意:这是第一部分,对于第二部分,每个人都已经提到了你需要做什么
更新:另一个实现:
let combineLatestToArray (list : IObservable<'T> list) =
let s = new Subject<'T array>()
let arr = Array.init list.Length (fun _ -> Unchecked.defaultof<'T>)
let main = list |> List.mapi (fun i o -> o.Select(fun t -> (i,t)))
|> Observable.Merge
async {
try
let se = main.ToEnumerable() |> Seq.scan (fun ar (i,t) -> Array.set ar i t; ar) arr
for i in se do
s.OnNext(i |> Array.toList |> List.toArray)
s.OnCompleted()
with
| :? Exception as ex -> s.OnError(ex)
} |> Async.Start
s :> IObservable<'T array>
-
似乎
Observable.Merge()
对IObservables
的可变数量有过载更接近你想要的。 -
Observable.Buffer()
与时间过载将在这里做你想要的。在"no events"的情况下,Buffer仍然会OnNext()一个空列表,让您对这种情况作出反应。
这是我能想到的最好的。我一直想解决这个问题。
public static class Extensions
{
public static IObservable<IEnumerable<T>> CombineLatest<T>(this Observable observable, IEnumerable<IObservable<T>> observableCollection)
{
return observableCollection.CombineLatest();
}
public static IObservable<IEnumerable<T>> CombineLatest<T>(this IEnumerable<IObservable<T>> observables)
{
return observables.Aggregate<IObservable<T>, IObservable<IEnumerable<T>>>
(
Observable.Return(Enumerable.Empty<T>()),
(o, n) => o.CombineLatest
(
n,
(list, t) => list.Concat(EnumerableEx.Return(t))
)
);
}
}
一个例子是:
var obs = new List<IObservable<bool>>
{
Observable.Return(true),
Observable.Return(false),
Observable.Return(true)
};
var result = obs.CombineLatest().Select(list => list.All(x => x));
result.Subscribe(Console.WriteLine);
Console.ReadKey();
但是,您必须知道,在所有可观察对象都产生值之前,生成的IObservable<IEnumerable<T>>
不会触发。这是我在我的场景中需要的,但可能不适合您的场景。
我担心的是所有. concats的性能。在扩展方法中处理可变集合可能会更高效。不确定。
对不起,我不知道f#。总有一天我会抽出时间的。
节流是在你得到最终的可观察对象之后用.Throttle
操作符完成的。
编辑:这个答案是Enigmativity的递归Yang的迭代Ying