F#MailboxProcessor-邮箱的多个等待读取器继续



我正在努力编写一个非常简单的异步测试框架。但我觉得我遇到了某种限制或错误。很抱歉,我无法在较小的代码库上复制此内容。

这是我提出的基本框架:

module TestRunner
open System
type TestOptions = {
Writer : ConsoleColor -> string -> unit}
type TestResults = {
Time : TimeSpan
Failure : exn option
}
type Test = {
Name : string
Finished : IEvent<TestResults>
SetFinished : TestResults -> unit
TestFunc : TestOptions -> Async<TestResults> }
let createTest name f =  
let ev = new Event<TestResults>()
{
Name = name 
Finished = ev.Publish
SetFinished = (fun res -> ev.Trigger res)
TestFunc = 
(fun options -> async {
let watch = System.Diagnostics.Stopwatch.StartNew()
try
do! f options
watch.Stop()
return { Failure = None; Time = watch.Elapsed }
with exn ->
watch.Stop()
return { Failure = Some exn; Time = watch.Elapsed }
})}
let simpleTest name f = 
createTest name (fun options -> f options.Writer)
/// Create a new Test and change the result
let mapResult mapping test = 
{ test with
TestFunc = 
(fun options -> async {
let! result = test.TestFunc options
return mapping result})}
let writeConsole color f = 
let old = System.Console.ForegroundColor
try
System.Console.ForegroundColor <- color
f()
finally
System.Console.ForegroundColor <- old
let printColor color (text:String) = 
writeConsole color (fun _ -> Console.WriteLine(text))

type WriterMessage = 
| NormalWrite of ConsoleColor * String
| StartTask of AsyncReplyChannel<int> * String
| WriteMessage of int * ConsoleColor * String
| EndTask of int
/// will handle printing jobs for two reasons
/// 1. Nice output grouped by tests (StartTask,WriteMessage,EndTask)
/// 2. Print Summary after all tests finished (NormalWrite)
let writer = MailboxProcessor.Start (fun inbox -> 
let currentTask = ref 0
let newHandle (returnHandle:AsyncReplyChannel<int>) = 
let handle = System.Threading.Interlocked.Increment currentTask
returnHandle.Reply handle
handle 
// the tasks describe which tasks are currently waiting to be processed
let rec loop tasks = async {
let! newTasks =
match tasks with
/// We process the Task with the number t and the name name
| (t, name) :: next -> 
inbox.Scan
(fun msg -> 
match msg with
| EndTask (endTask) -> 
// if the message is from the current task finish it
if t = endTask then
Some (async { return next })
else None
| WriteMessage(writeTask, color, message) ->
if writeTask = t then 
Some (async {
printColor color (sprintf "Task %s: %s" name message)
return tasks
})
else None
| StartTask (returnHandle, name) -> 
// Start any tasks instantly and add them to the list (because otherwise they would just wait for the resonse)
Some (async { 
let handle = newHandle returnHandle
return (List.append tasks [handle, name]) })
| _ -> None)
// No Current Tasks so just start ones or process the NormalWrite messages
| [] ->
inbox.Scan     
(fun msg -> 
match msg with
| StartTask (returnHandle, name) -> 
Some (async { 
let handle = newHandle returnHandle
return [handle, name] })
| NormalWrite(color, message) ->
Some (async {
printColor color message
return []
})
| _ -> None)   
return! loop newTasks 
}
loop [])
/// Write a normal message via writer
let writerWrite color (text:String) = 
writer.Post(NormalWrite(color, text))
/// A wrapper around the communication (to not miss EndTask for a StartTask)
let createTestWriter name f = async {
let! handle = writer.PostAndAsyncReply(fun reply -> StartTask(reply, name))
try
let writer color s = 
writer.Post(WriteMessage(handle,color,s))
return! f(writer)
finally
writer.Post (EndTask(handle))
}
/// Run the given test and print the results
let testRun t = async {
let! results = createTestWriter t.Name (fun writer -> async {
writer ConsoleColor.Green (sprintf "started")
let! results = t.TestFunc { Writer = writer }
match results.Failure with
| Some exn -> 
writer ConsoleColor.Red (sprintf "failed with %O" exn)
| None ->
writer ConsoleColor.Green (sprintf "succeeded!")
return results}) 
t.SetFinished results
}
/// Start the given task with the given amount of workers
let startParallelMailbox workerNum f = 
MailboxProcessor.Start(fun inbox ->
let workers = Array.init workerNum (fun _ -> MailboxProcessor.Start f)
let rec loop currentNum = async {
let! msg = inbox.Receive()
workers.[currentNum].Post msg
return! loop ((currentNum + 1) % workerNum)
}
loop 0 )
/// Runs all posted Tasks
let testRunner = 
startParallelMailbox 10 (fun inbox ->
let rec loop () = async {
let! test = inbox.Receive()
do! testRun test
return! loop()
}
loop ())
/// Start the given tests and print a sumary at the end
let startTests tests = async {
let! results =
tests 
|> Seq.map (fun t ->
let waiter = t.Finished |> Async.AwaitEvent
testRunner.Post t
waiter
)
|> Async.Parallel
let testTime = 
results
|> Seq.map (fun res -> res.Time)
|> Seq.fold (fun state item -> state + item) TimeSpan.Zero
let failed = 
results
|> Seq.map (fun res -> res.Failure) 
|> Seq.filter (fun o -> o.IsSome)
|> Seq.length
let testCount = results.Length
if failed > 0 then
writerWrite ConsoleColor.DarkRed (sprintf "--- %d of %d TESTS FAILED (%A) ---" failed testCount testTime)
else
writerWrite ConsoleColor.DarkGray (sprintf "--- %d TESTS FINISHED SUCCESFULLY (%A) ---" testCount testTime)
}

现在,只有当我使用一组特定的测试时,才会触发Exception它们在网络上进行一些爬行(有些失败,有些没有,这很好):

#r @"Yaaf.GameMediaManager.Primitives.dll";; // See below
open TestRunner
let testLink link =
Yaaf.GameMediaManager.EslGrabber.getMatchMembers link
|> Async.Ignore
let tests = [
// Some working links (links that should work)
yield! 
[ //"TestMatch", "http://www.esl.eu/eu/wire/anti-cheat/css/anticheat_test/match/26077222/"
"MatchwithCheater", "http://www.esl.eu/de/csgo/ui/versus/match/3035028"
"DeletedAccount", "http://www.esl.eu/de/css/ui/versus/match/2852106" 
"CS1.6", "http://www.esl.eu/de/cs/ui/versus/match/2997440" 
"2on2Versus", "http://www.esl.eu/de/css/ui/versus/match/3012767" 
"SC2cup1on1", "http://www.esl.eu/eu/sc2/go4sc2/cup230/match/26964055/"
"CSGO2on2Cup", "http://www.esl.eu/de/csgo/cups/2on2/season_08/match/26854846/"
"CSSAwpCup", "http://www.esl.eu/eu/css/cups/2on2/awp_cup_11/match/26811005/"
] |> Seq.map (fun (name, workingLink) -> simpleTest (sprintf "TestEslMatches_%s" name) (fun o -> testLink workingLink))
]
startTests tests |> Async.Start;; // this will produce the Exception now and then

https://github.com/matthid/Yaaf.GameMediaManager/blob/core/src/Yaaf.GameMediaManager.Primitives/EslGrabber.fs是代码,您可以下载https://github.com/downloads/matthid/Yaaf.GameMediaManager/GameMediaManager.%200.9.3.1.wireplugin(这基本上是一个重命名的zip档案)并提取它以获得Yaaf。GameMediaManager。Primitives.dll二进制(你可以把它粘贴到FSI中,而不是在你想要的时候下载,但你必须参考HtmlAgilityPack)

我可以用微软(R)F#2.0 Interactive版本4.0.40219.1复制这一点。问题是,异常不会总是被触发(但非常频繁),并且堆栈竞赛没有告诉我

System.Exception: multiple waiting reader continuations for mailbox
bei <StartupCode$FSharp-Core>.$Control.-ctor@1860-3.Invoke(AsyncParams`1 _arg11)
bei <StartupCode$FSharp-Core>.$Control.loop@413-40(Trampoline this, FSharpFunc`2 action)
bei Microsoft.FSharp.Control.Trampoline.ExecuteAction(FSharpFunc`2 firstAction)
bei Microsoft.FSharp.Control.TrampolineHolder.Protect(FSharpFunc`2 firstAction)
bei <StartupCode$FSharp-Core>.$Control.finishTask@1280[T](AsyncParams`1 _arg3, AsyncParamsAux aux, FSharpRef`1 firstExn, T[] results, TrampolineHolder trampolineHolder, Int32 remaining)
bei <StartupCode$FSharp-Core>.$Control.recordFailure@1302[T](AsyncParams`1 _arg3, AsyncParamsAux aux, FSharpRef`1 count, FSharpRef`1 firstExn, T[] results, LinkedSubSource innerCTS, TrampolineHolder trampolineHolder, FSharpChoice`2 exn)
bei <StartupCode$FSharp-Core>.$Control.Parallel@1322-3.Invoke(Exception exn)
bei Microsoft.FSharp.Control.AsyncBuilderImpl.protectedPrimitive@690.Invoke(AsyncParams`1 args)
bei <StartupCode$FSharp-Core>.$Control.loop@413-40(Trampoline this, FSharpFunc`2 action)
bei Microsoft.FSharp.Control.Trampoline.ExecuteAction(FSharpFunc`2 firstAction)
bei Microsoft.FSharp.Control.TrampolineHolder.Protect(FSharpFunc`2 firstAction)
bei <StartupCode$FSharp-Core>.$Control.-ctor@473-1.Invoke(Object state)
bei System.Threading.QueueUserWorkItemCallback.WaitCallback_Context(Object state)
bei System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean ignoreSyncCtx)
bei System.Threading.QueueUserWorkItemCallback.System.Threading.IThreadPoolWorkItem.ExecuteWorkItem()
bei System.Threading.ThreadPoolWorkQueue.Dispatch()
bei System.Threading._ThreadPoolWaitCallback.PerformWaitCallback()

因为这将在我无法控制的工作线程上触发,所以这将使应用程序崩溃(不是FSI,但异常也将显示在这里)。

我发现http://cs.hubfs.net/topic/Some/2/59152和http://cs.hubfs.net/topic/None/59146但我不使用StartChild,我不认为我同时从多个线程调用Receive?

我的代码有什么问题吗?或者这确实是一个错误吗?如果可能的话,我该如何解决?

我注意到,在FSI中,当异常被静默忽略时,所有测试都将按预期运行。我该怎么做?

编辑:我注意到,在我修复了失败的单元测试后,它将正常工作。然而,我仍然无法用较小的代码库来重现这一点。例如,我自己的失败测试。

谢谢,matthid

我的感觉是限制在MailboxProcessor本身,而不是异步。

老实说,我对扫描功能过于谨慎。我写了一篇关于使用它们的危险性的博客文章。

是否可以使用标准接收机制而不是使用扫描功能来处理任务?

需要注意的是,在async中使用了蹦床,这样同一个线程就可以重复使用一段时间,以避免不必要的线程池使用(我认为这设置为300),所以在调试时,您可能会看到这种行为。

我会以稍微不同的方式处理这个问题,将单独的组件分解为管道阶段,而不是嵌套的异步块。我会创建一个主管组件和路由组件。

Supervisor将负责初始测试,并将消息发布到路由组件,路由组件将请求循环发送到其他代理。任务完成后,他们可以向主管反馈。

我意识到这并不能真正解决当前代码中的问题,但我认为无论如何都必须分解问题,以便调试系统的异步部分。

我确实认为Scan/TryScan/Receive的2.0实现中存在一个错误,可能会错误地导致

multiple waiting reader continuations for mailbox 

例外;我认为这个bug现在已经在3.0实现中修复了。我没有仔细查看您的代码,试图确保您在实现中一次只接收一条消息,所以这也可能是您代码中的一个错误。如果你能在F#3.0的比赛中尝试一下,如果这种情况消失了,那就太好了。

遗憾的是,我从未真正能够在较小的代码库上重现这一点,现在我将使用支持异步测试的NUnit,而不是我自己的实现。从那以后,我在各种项目中使用了代理(MailboxProcessor)和异步,再也没有遇到过这种情况。。。

一些注意事项,以防有人发现我的经验有用(调试多个进程需要很长时间才能找到问题):

执行和吞吐量开始被50个代理/邮箱堵塞。有时,在轻负载的情况下,它可以用于第一轮消息,但任何像调用日志库这样重要的事情都会触发更长的延迟。

使用VS IDE中的Threads/Parallel Stacks窗口进行调试,运行时正在等待FSharpAsync。同步运行->CancellationTokenOps。蹦床的RunSynchronously呼叫。执行操作

我怀疑底层的ThreadPool正在限制启动(在第一次运行正常之后)。这是一个很长的延迟。我使用代理在某些队列中串行化次要计算,同时允许主调度代理保持响应,因此延迟在CLR中。

我发现,在尝试中运行MailboxProcessor Receive with Timeout,可以停止延迟,但这需要封装在异步块中,以阻止程序的其余部分放慢速度,无论延迟多么短。尽管有点无聊,但我对F#MailboxProcessor实现演员模型感到非常满意。

相关内容

  • 没有找到相关文章

最新更新