我使用的是这里提供的RX生成器的一个稍微修改过的版本:
http://mnajder.blogspot.com/2011/09/when-reactive-framework-meets-f-30.html
我的计算表达式不是直接取IObservable<'T>
,而是一种类型:
type MyType<'a,'b> = MyType of (IObservable<'a> -> IObservable<'b>)
let extract (MyType t) = t
然后组合器采用以下形式:
let where (f: 'b -> bool) (m:MyType<_,'b>) = MyType(fun input -> (extract m input).Where(f))
在表达式本身中,我经常需要引用返回到流中的以前的值。为了做到这一点,我定义了一个MyType
,它维护了n
最新值的滚动不可变列表。
let history n =
MyType(fun input ->
Observable.Create(fun (o:IObserver<_>) ->
let buffer = new History<_>(n)
o.OnNext(HistoryReadOnly(buffer))
input.Subscribe(buffer.Push, o.OnError, o.OnCompleted)
)
)
有了这个,我现在可以做一些类似的事情:
let f = obs {
let! history = history 20
// Run some other types, and possibly do something with history
}
我发现我经常使用这个历史记录,理想情况下我希望将其直接嵌入到IObservable<'a>
中。显然我做不到。所以我的问题是,什么是合理的方式来介绍我在这里的历史概念。我应该扩展IObservable<'T>
(不确定如何做到这一点),包装IObservable<'T>
吗?
我感谢你的建议。
编辑:添加了完整的示例代码。
open System
open System.Collections.Generic
open System.Reactive.Subjects
open System.Reactive.Linq
// Container function
type MyType<'a,'b> = MyType of (IObservable<'a> -> IObservable<'b>)
let extract (MyType t) = t
// Mini Builder
let internal mbind (myTypeB:MyType<'a,'b>) (f:'b -> MyType<'a,'c>) =
MyType(fun input ->
let obsB = extract myTypeB input
let myTypeC= fun resB -> extract (f resB) input
obsB.SelectMany(myTypeC)
)
type MyTypeBuilder() =
member x.Bind (m,f) = mbind m f
member x.Combine (a,b) = MyType(fun input -> (extract a input).Concat(extract b input))
member x.Yield (r) = MyType(fun input -> Observable.Return(r))
member x.YieldFrom (m:MyType<_,_>) = m
member x.Zero() = MyType(fun input -> Observable.Empty())
member x.Delay(f:unit -> MyType<'a,'b>) = f()
let mtypeBuilder = new MyTypeBuilder()
// Combinators
let simplehistory =
MyType(fun input ->
Observable.Create(fun (o:IObserver<_>) ->
let buffer = new List<_>()
o.OnNext(buffer)
input.Subscribe(buffer.Add, o.OnError, o.OnCompleted)
)
)
let where (f: 'b -> bool) m = MyType(fun input -> (extract m input).Where(f))
let take (n:int) m = MyType(fun input -> (extract m input).Take(n))
let buffer m = MyType(fun input -> (extract m input).Buffer(1))
let stream = MyType(id)
// Example
let myTypeResult (t:MyType<'a,'b>) (input:'a[]) = (extract t (input.ToObservable().Publish().RefCount())).ToArray().Single()
let dat = [|1 .. 20|]
let example = mtypeBuilder {
let! history = simplehistory
let! someEven = stream |> where(fun v -> v % 2 = 0) // Foreach Even
let! firstValAfterPrevMatch = stream |> take 1 // Potentially where a buffer operation would run, all values here are after i.e. we cant get values before last match
let! odd = stream |> where(fun v -> v % 2 = 1) |> take 2 // Take 2 odds that follow it
yield (history.[history.Count - 1], history.[0], someEven,firstValAfterPrevMatch, odd) // Return the last visited item in our stream, the very first item, an even, the first value after the even and an odd
}
let result = myTypeResult example dat
val result : (int * int * int * int * int) [] =
[|(5, 1, 2, 3, 5); (7, 1, 2, 3, 7); (7, 1, 4, 5, 7); (9, 1, 4, 5, 9);
(9, 1, 6, 7, 9); (11, 1, 6, 7, 11); (11, 1, 8, 9, 11); (13, 1, 8, 9, 13);
(13, 1, 10, 11, 13); (15, 1, 10, 11, 15); (15, 1, 12, 13, 15);
(17, 1, 12, 13, 17); (17, 1, 14, 15, 17); (19, 1, 14, 15, 19);
(19, 1, 16, 17, 19)|]
使用标准的Rx工作流生成器,您可以创建一个函数history
来处理您的示例用例:
let history (io:IObservable<_>) =
io.Scan(new List<_>(), (fun l t -> l.Add t; l)).Distinct()
let io = new Subject<int>()
let newio = rx { let! history = history io
let! even = io.Where(fun v -> v % 2 = 0)
let! next = io.Take 1
let! odd = io.Where(fun v -> v % 2 = 1).Take 2
yield (history.Last(), history.[0], even, next, odd) }
newio.Subscribe(printfn "%O") |> ignore
for i = 1 to 20 do
io.OnNext i
扩展它以提供历史长度限制应该是微不足道的。你需要定义自己的类型/构建者有什么具体的原因吗?或者这样做只是为了达到这样的目的?
下面是一个组合子的例子。您只需要定义rx
块之外的可观察对象。您可以让history
以不同的方式处理不可变的历史记录,而不是持久列表,这样就可以满足您的需求。
let history (io:IObservable<_>) =
io.Scan(new List<_>(), (fun l t -> l.Add t; l))
let newest (hist:'a List) = hist.Last()
let extract (ioHist:'a List IObservable) = ioHist.Select newest
let take (i:int) (ioHist: 'a List IObservable) = ioHist.Take i
let where (f: 'a -> bool) (ioHist: 'a List IObservable) = ioHist.Where(fun hist -> f(newest hist))
let io = new Subject<int>()
let history1 = history io
let newio =
rx { let! hist = history1.Distinct()
let! even = extract (history1 |> where (fun v -> v % 2 = 0))
let! next = extract (history1 |> take 1)
let! odd = extract (history1 |> where (fun v -> v % 2 = 1) |> take 2)
yield (hist.Last(), hist.[0], even, next, odd) }
您已经可以使用Observable.Buffer来完成此操作了。对不起,我的F#帽子今天没想。
IObservable<int> source = ...
IOBservable<IList<int>> buffered = source.Buffer(5,1)
将为您创建一个列表流。
或者尝试在LINQ中使用缓冲区,这更像F#查询表达式
Console.WriteLine ("START");
var source = new List<int> () { 1, 2, 3, 4, 5 }.ToObservable ();
// LINQ C#'s Monad sugar
var r =
from buffer in source.Buffer (3, 1)
from x in buffer
from y in buffer
select new { x,y};
r.Subscribe (o=>Console.WriteLine (o.x + " " + o.y));
Console.WriteLine ("END");
请注意,LINQ中的from
与f#查询表达式中的let!
完全/几乎相同。结果如下。还要注意我稍后在expression
中如何使用buffer
,就像在f#查询表达式中一样。
START
1 1
1 2
1 3
2 1
2 2
2 3
3 1
3 2
3 3
2 2
2 3
2 4
3 2
3 3
3 4
4 2
4 3
4 4
3 3
3 4
3 5
4 3
4 4
4 5
5 3
5 4
5 5
4 4
4 5
5 4
5 5
5 5
END
对不起,我的F#非常生疏,但也许您正在寻找Scan
运算符。它将在源生成值时将值推送到累加器,然后您可以使用该累加器为投影生成下一个值。
在这里(在C#中,很抱歉),我们取一个[0..10]的序列,它产生的值间隔100ms,然后我们返回一个运行的Sum。
var source = Observable.Interval(TimeSpan.FromMilliseconds(100))
.Take(10);
source.Scan(
new List<long>(), //Seed value
(acc, value)=> //Accumulator function
{
acc.Add(value);
return acc;
}
)
.Select(accumate=>accumate.Sum())
其产生间隔100ms的值[0,1,3,6,10,15,21,28,36,45]。
我认为有了这个工具,你可以管理你的值的历史记录(通过将它们添加到历史记录/累加器中),然后在选择中使用这个历史记录来投影合适的值。