f# 3.0 - F# 3.0:系统异常:邮箱的多个等待读取器延续



我正在尝试运行一些邮箱处理器测试,似乎邮箱 Scan() 失败并显示"System.Exception:邮箱的多个等待读取器延续"。Async.Start 和 Async.StartImimmediate 等都会发生这种情况(Async.RunSyncly 也无法正常工作,因为那时只有一个处理器,在初始客户之后没有客户)。

这是演示代码,这在交互式中工作:

#if INTERACTIVE
#r "../packages/FSharp.Data.2.0.4/lib/net40/FSharp.Data.dll"
#endif
open System
open FSharp.Data
let random = new Random()
let data = FreebaseData.GetDataContext()
let customerNames = data.Commons.Computers.``Computer Scientists``
let nameAmount = customerNames |> Seq.length
// ----
type Customer() =
    let person = customerNames |> Seq.nth (random.Next nameAmount)
    member x.Id = Guid.NewGuid()
    member x.Name = person.Name
    member x.RequiredTime = random.Next(10000)
type Barber(name) =
    member x.Name = name
type ``Possible actions notified to barber`` = 
| CustomerWalksIn of Customer
let availableCustomers = new MailboxProcessor<``Possible actions notified to barber``>(fun c -> async { () })
let createBarber name = 
    Console.WriteLine("Barber " + name + " takes inital nap...")
    let rec cutSomeHairs () = 
        async{
            do! availableCustomers.Scan(function 
                | CustomerWalksIn customer ->
                    async {
                        Console.WriteLine("Barber " + name + " is awake and started cutting " + customer.Name + "'s hair.")
                        // exception also happen with Threading.Thread.Sleep()
                        do! Async.Sleep customer.RequiredTime
                        Console.WriteLine("Barber " + name + " finnished cutting " + customer.Name + "'s hair. Going to sleep now...")
                    } |> Some)
            do! cutSomeHairs ()
            }
    cutSomeHairs() |> Async.StartImmediate
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
createBarber "Tuomas";
createBarber "Seppo";
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)

。运行一段时间后我得到的堆栈跟踪是:

System.Exception: multiple waiting reader continuations for mailbox
   at <StartupCode$FSharp-Core>.$Control.-ctor@2136-3.Invoke(AsyncParams`1 _arg1)
   at <StartupCode$FSharp-Core>.$Control.loop@435-40(Trampoline this, FSharpFunc`2 action)
   at Microsoft.FSharp.Control.Trampoline.ExecuteAction(FSharpFunc`2 firstAction)
   at Microsoft.FSharp.Control.TrampolineHolder.Protect(FSharpFunc`2 firstAction)
   at <StartupCode$FSharp-Core>.$Control.Sleep@1508-1.Invoke(Object state)
   at System.Threading.TimerQueueTimer.CallCallbackInContext(Object state)
   at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
   at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
   at System.Threading.TimerQueueTimer.CallCallback()
   at System.Threading.TimerQueueTimer.Fire()
   at System.Threading.TimerQueue.FireNextTimers()
   at System.Threading.TimerQueue.AppDomainTimerCallback()
Stopped due to error

或没有线程的相同:

System.Exception: multiple waiting reader continuations for mailbox
   at <StartupCode$FSharp-Core>.$Control.-ctor@2136-3.Invoke(AsyncParams`1 _arg1)
   at <StartupCode$FSharp-Core>.$Control.loop@435-40(Trampoline this, FSharpFunc`2 action)
   at Microsoft.FSharp.Control.Trampoline.ExecuteAction(FSharpFunc`2 firstAction)
   at Microsoft.FSharp.Control.TrampolineHolder.Protect(FSharpFunc`2 firstAction)
   at <StartupCode$FSharp-Core>.$Control.-ctor@511.Invoke(Object state)
Stopped due to error

MailboxProcessorReceiveScan方法应仅从代理的主体调用。引用 MSDN 文档:

此方法用于代理正文。对于每个代理,最多可以有一个并发读取器处于活动状态,因此对接收、尝试接收、扫描或 TryScan 的并发调用最多可以处于活动状态。扫描程序函数的主体在执行期间被锁定,但锁定在异步工作流执行之前释放。

因此,您需要以不同的方式构建代码。我没有详细的答案,但听起来我关于使用代理实现阻塞队列的文章可以在这里有所帮助。

正如 Tomas 已经指出的那样,MailboxProcessor直接只允许一个读取器,在异步系统中解决此问题的一种方法是编写自己的队列或邮箱类型。 然而,Tomas的文章指出的一件事没有谈到,实现新通信原语的另一种方法是使用Async.FromContinuations而不是MailboxProcessorAsyncReplyChannel

使用Async.FromContinuations的主要优点是您可以更直接地访问异步机制,而不必在MailboxProcessorAsyncReplyChannel施加的限制内工作。 主要缺点是您需要自己确保队列或邮箱线程的安全。

作为一个具体的例子,Anton Tayanovskyy的博客文章"使异步速度提高了5倍",其中包含使用Async.FromContinuations实现的多读写器同步通道的实现。

最新更新