我正在尝试运行一些邮箱处理器测试,似乎邮箱 Scan() 失败并显示"System.Exception:邮箱的多个等待读取器延续"。Async.Start 和 Async.StartImimmediate 等都会发生这种情况(Async.RunSyncly 也无法正常工作,因为那时只有一个处理器,在初始客户之后没有客户)。
这是演示代码,这在交互式中工作:
#if INTERACTIVE
#r "../packages/FSharp.Data.2.0.4/lib/net40/FSharp.Data.dll"
#endif
open System
open FSharp.Data
let random = new Random()
let data = FreebaseData.GetDataContext()
let customerNames = data.Commons.Computers.``Computer Scientists``
let nameAmount = customerNames |> Seq.length
// ----
type Customer() =
let person = customerNames |> Seq.nth (random.Next nameAmount)
member x.Id = Guid.NewGuid()
member x.Name = person.Name
member x.RequiredTime = random.Next(10000)
type Barber(name) =
member x.Name = name
type ``Possible actions notified to barber`` =
| CustomerWalksIn of Customer
let availableCustomers = new MailboxProcessor<``Possible actions notified to barber``>(fun c -> async { () })
let createBarber name =
Console.WriteLine("Barber " + name + " takes inital nap...")
let rec cutSomeHairs () =
async{
do! availableCustomers.Scan(function
| CustomerWalksIn customer ->
async {
Console.WriteLine("Barber " + name + " is awake and started cutting " + customer.Name + "'s hair.")
// exception also happen with Threading.Thread.Sleep()
do! Async.Sleep customer.RequiredTime
Console.WriteLine("Barber " + name + " finnished cutting " + customer.Name + "'s hair. Going to sleep now...")
} |> Some)
do! cutSomeHairs ()
}
cutSomeHairs() |> Async.StartImmediate
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
createBarber "Tuomas";
createBarber "Seppo";
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
availableCustomers.Post(new Customer() |> CustomerWalksIn)
。运行一段时间后我得到的堆栈跟踪是:
System.Exception: multiple waiting reader continuations for mailbox
at <StartupCode$FSharp-Core>.$Control.-ctor@2136-3.Invoke(AsyncParams`1 _arg1)
at <StartupCode$FSharp-Core>.$Control.loop@435-40(Trampoline this, FSharpFunc`2 action)
at Microsoft.FSharp.Control.Trampoline.ExecuteAction(FSharpFunc`2 firstAction)
at Microsoft.FSharp.Control.TrampolineHolder.Protect(FSharpFunc`2 firstAction)
at <StartupCode$FSharp-Core>.$Control.Sleep@1508-1.Invoke(Object state)
at System.Threading.TimerQueueTimer.CallCallbackInContext(Object state)
at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx)
at System.Threading.TimerQueueTimer.CallCallback()
at System.Threading.TimerQueueTimer.Fire()
at System.Threading.TimerQueue.FireNextTimers()
at System.Threading.TimerQueue.AppDomainTimerCallback()
Stopped due to error
或没有线程的相同:
System.Exception: multiple waiting reader continuations for mailbox
at <StartupCode$FSharp-Core>.$Control.-ctor@2136-3.Invoke(AsyncParams`1 _arg1)
at <StartupCode$FSharp-Core>.$Control.loop@435-40(Trampoline this, FSharpFunc`2 action)
at Microsoft.FSharp.Control.Trampoline.ExecuteAction(FSharpFunc`2 firstAction)
at Microsoft.FSharp.Control.TrampolineHolder.Protect(FSharpFunc`2 firstAction)
at <StartupCode$FSharp-Core>.$Control.-ctor@511.Invoke(Object state)
Stopped due to error
MailboxProcessor
的Receive
和Scan
方法应仅从代理的主体调用。引用 MSDN 文档:
此方法用于代理正文。对于每个代理,最多可以有一个并发读取器处于活动状态,因此对接收、尝试接收、扫描或 TryScan 的并发调用最多可以处于活动状态。扫描程序函数的主体在执行期间被锁定,但锁定在异步工作流执行之前释放。
因此,您需要以不同的方式构建代码。我没有详细的答案,但听起来我关于使用代理实现阻塞队列的文章可以在这里有所帮助。
正如 Tomas 已经指出的那样,MailboxProcessor
直接只允许一个读取器,在异步系统中解决此问题的一种方法是编写自己的队列或邮箱类型。 然而,Tomas的文章指出的一件事没有谈到,实现新通信原语的另一种方法是使用Async.FromContinuations
而不是MailboxProcessor
和AsyncReplyChannel
。
使用Async.FromContinuations
的主要优点是您可以更直接地访问异步机制,而不必在MailboxProcessor
和AsyncReplyChannel
施加的限制内工作。 主要缺点是您需要自己确保队列或邮箱线程的安全。
作为一个具体的例子,Anton Tayanovskyy的博客文章"使异步速度提高了5倍",其中包含使用Async.FromContinuations
实现的多读写器同步通道的实现。