动画图表与观察基于无限序列



我在使用FSharp创建我的一些数据的动画可视化时遇到了麻烦。图表FSharp.Control.Reactive.

本质上,我有一个无限的点生成器。我的实际生成器比下面的示例更复杂,但是这个简化的示例再现了这个问题:

let move points =
    Seq.initInfinite (fun i -> points |> List.map (fun (x, y) -> x + i, y + i))

这个move函数的类型是(int * int) list -> seq<(int * int) list>。对于每次迭代,它将输入列表中的所有点平移1,以便点向上和向右移动。

我想在LiveChart.Point上显示这个。由move生成的序列可以转换为适当的Observable,但它本身运行得相当快,所以我首先将其放慢一点:

// Slow down any sequence
let delay (duration : TimeSpan) xs = seq {
    for x in xs do
    duration.TotalMilliseconds |> int |> Async.Sleep |> Async.RunSynchronously
    yield x }

这使我能够从点的序列中创建一个Observable:

let obs =
    [(1, 0); (0, 1)]
    |> move
    |> delay (TimeSpan.FromSeconds 0.5)
    |> Observable.toObservable

我还可以看到,如果我打印到控制台,它可以工作:

obs.Subscribe (fun x -> printfn "%O" x)

通过打印到控制台,很明显这会阻塞执行环境;例如,如果您将脚本发送到 f# Interactive (FSI),它将继续打印,并且您必须取消评估或重置会话以停止它。

我的理论是,这是因为obs与执行环境在同一个线程上运行。

如果我试着把它变成LiveChart.Point,也会发生同样的情况:

let lc = obs |> LiveChart.Point
lc.ShowChart()

如果我将此发送给FSI,什么也没有发生(没有显示图表),并且FSI阻塞。

这似乎与我的理论一致,即观察者与图表运行在同一个线程上。

如何使观察者在不同的线程上运行?

我发现了Observable.observeOn,它需要一个IScheduler。浏览MSDN,我发现了NewThreadScheduler, ThreadPoolScheduler和TaskPoolScheduler,它们都实现了IScheduler。这些类的名字听起来很有希望,但是我找不到它们!

根据文档,它们都在System.Reactive.dll中定义,但是当我拥有FSharp.Control的所有依赖项时。反应式,我没有那个组件。搜索互联网也没有显示在哪里可以得到它。

是响应式扩展的旧版本还是新版本?我找对方向了吗?

我如何在LiveChart上可视化我的无穷点序列?


下面是重现这个问题的完整脚本:

#r @"../packages/Rx-Interfaces.2.2.5/lib/net45/System.Reactive.Interfaces.dll"
#r @"../packages/Rx-Core.2.2.5/lib/net45/System.Reactive.Core.dll"
#r @"../packages/Rx-Linq.2.2.5/lib/net45/System.Reactive.Linq.dll"
#r @"../packages/FSharp.Control.Reactive.3.2.0/lib/net40/FSharp.Control.Reactive.dll"
#r @"../packages/FSharp.Charting.0.90.12/lib/net40/FSharp.Charting.dll"
#r "System.Windows.Forms.DataVisualization"
open System
open FSharp.Control.Reactive
open FSharp.Charting
let move points =
    Seq.initInfinite (fun i -> points |> List.map (fun (x, y) -> x + i, y + i))
// Slow down any sequence
let delay (duration : TimeSpan) xs = seq {
    for x in xs do
    duration.TotalMilliseconds |> int |> Async.Sleep |> Async.RunSynchronously
    yield x }
let obs =
    [(1, 0); (0, 1)]
    |> move
    |> delay (TimeSpan.FromSeconds 0.5)
    |> Observable.toObservable
//obs.Subscribe (fun x -> printfn "%O" x)
let lc = obs |> LiveChart.Point
lc.ShowChart()

安装的NuGet包:

Id                                  Versions
--                                  --------
FSharp.Charting                     {0.90.12}
FSharp.Control.Reactive             {3.2.0}
FSharp.Core                         {3.1.2}
Rx-Core                             {2.2.5}
Rx-Interfaces                       {2.2.5}
Rx-Linq                             {2.2.5}

诀窍是使用subscribeOn。From introtorx.com on subscribeOn and observeOn:

在这里我想指出的一个陷阱是,最初几次使用这些重载时,我对它们的实际作用感到困惑。您应该使用SubscribeOn方法来描述您希望如何调度任何预热和后台处理代码。例如,如果你要对Observable使用SubscribeOn。创建时,传递给Create方法的委托将在指定的调度程序上运行。

ObserveOn方法用于声明您希望通知被调度到的位置。我建议在STA系统(最常见的UI应用程序)中使用ObserveOn方法是最有用的。

完整脚本如下:

#r @"packages/Rx-Interfaces.2.2.5/lib/net45/System.Reactive.Interfaces.dll"
#r @"packages/Rx-PlatformServices.2.2.5/lib/net45/System.Reactive.PlatformServices.dll"
#r @"packages/Rx-Core.2.2.5/lib/net45/System.Reactive.Core.dll"
#r @"packages/Rx-Linq.2.2.5/lib/net45/System.Reactive.Linq.dll"
#r @"packages/FSharp.Control.Reactive.3.2.0/lib/net40/FSharp.Control.Reactive.dll"
#r @"packages/FSharp.Charting.0.90.12/lib/net40/FSharp.Charting.dll"
#r "System.Windows.Forms.DataVisualization"
open System
open FSharp.Control.Reactive
open FSharp.Charting
open System.Reactive.Concurrency
let move points =
    Seq.initInfinite (fun i -> points |> List.map (fun (x, y) -> x + i, y + i))
// Slow down any sequence
let delay (duration : TimeSpan) xs = seq {
    for x in xs do
    duration.TotalMilliseconds |> int |> Async.Sleep |> Async.RunSynchronously
    yield x }
let obs =
    [(1, 0); (0, 1)]
    |> move
    |> delay (TimeSpan.FromSeconds 0.5)
    |> Observable.toObservable
    |> Observable.subscribeOn NewThreadScheduler.Default 
let lc = obs |> LiveChart.Point
lc.ShowChart()

通常在UI应用程序中,您将配对subscribeOn和observeOn以确保结果被传递回UI线程。这里似乎不需要,因为它看起来像图表处理这个为你(为我工作)。

调度器在System.Reactive.PlatformServices.dll(由Rx-PlatformServices包安装)中定义。

我在这里找到了它们:https://github.com/Reactive-Extensions/Rx.NET/tree/master/Rx.NET/Source/System.Reactive.PlatformServices/Reactive/Concurrency

(observable).ObserveOn(System.Reactive.Concurrency.NewThreadScheduler.Default)

相关内容

  • 没有找到相关文章

最新更新