火和不等待(没有做!)与火和等待(做!)得到了巨大的差异

  • 本文关键字:等待 巨大 f#
  • 更新时间 :
  • 英文 :


以下代码运行大约需要 20 秒。但是,在取消注释后不到一秒钟do!.为什么会有如此巨大的差异?

更新:使用ag.Add需要9秒。我已经更新了代码。

open FSharpx.Control
let test () =
let ag = new BlockingQueueAgent<int option>(500)
let enqueue() = async { 
for i = 1 to 500 do 
//do! ag.AsyncAdd (Some i) // less than a second with do!
ag.AsyncAdd (Some i)       // it takes about 20 seconds without do!
//ag.Add (Some i)          // This one takes about 9 seconds
//printfn "=> %d" i 
}
async {
do! [ for i = 1 to 100 do yield enqueue() ] 
|> Async.Parallel |> Async.Ignore
for i = 1 to 5 do ag.Add None
} |> Async.Start
let rec dequeue() =
async {
let! m = ag.AsyncGet()
match m with
| Some v ->
//printfn "<= %d" v
return! dequeue()
| None -> 
printfn "Done" 
}
[ for i = 1 to 5 do yield dequeue() ] 
|> Async.Parallel |> Async.Ignore |> Async.RunSynchronously
0

如果没有do!,您就不会等待AsyncAdd的结果。这意味着您将尽可能快地启动 500 次AsyncAdd操作,每次调用enqueue()。尽管如果队列已满,每个AsyncAdd调用都会阻塞,但如果您不等待AsyncAdd的结果,那么您的enqueue()代码不会被阻止,并且它将继续启动新的AsyncAdd操作。

由于您并行启动 100 个enqueue()操作,因此可能会有多达 5 万个AsyncAdd操作尝试同时运行,这意味着线程池正在处理 49,500 个阻塞线程。这是对你的系统的需求很大。实际上,您不会同时并行启动 100 个enqueue()操作,但您将启动与逻辑 CPU 一样多的enqueue()操作。对于这个答案的其余部分,我将假设你有一个具有超线程的四核处理器(因为你的F# Async.Parallel |> Async.RunSyncly只使用八个CPU内核中的一个?问题似乎暗示),所以这是8个逻辑CPU,所以你将在任何事情阻塞之前启动enqueue()的八个副本,这意味着你将有4,000个AsyncAdd线程运行, 其中3,500个将被阻止。

另一方面,当您使用do!时,如果AsyncAdd被阻止,您的enqueue()操作也将阻塞,直到队列中有空位打开。因此,一旦队列中有 500 个项目,线程池中将有 8 个阻塞的AsyncAdd线程(八个逻辑 CPU 上运行的八个enqueue()操作中的每一个,而不是 (8*500 - 500 = 3500) 阻塞的线程AsyncAdd线程)。八个阻塞的线程而不是 3,500 个意味着线程池不会进行 3,500 个分配,使用更少的 RAM 和更少的 CPU 时间来处理所有这些线程。

正如我在回答您之前的问题时所说,您似乎确实需要对异步操作有更深入的了解。除了我在该答案中链接到的文章(本文和本系列)之外,我还建议您阅读 https://medium.com/jettech/f-async-guide-eb3c8a2d180a,这是关于 F# 异步操作和您可能遇到的一些"陷阱"的相当长且详细的指南。我强烈建议你去阅读这些文章,然后回过头来再次查看您的问题。随着您从阅读这些文章中获得的更深入理解,您也许能够回答自己的问题!

继续这个问题。下面是基于代码的实验:

// Learn more about F# at http://fsharp.org
module Test.T1
open System
open System.Collections.Generic
open System.Diagnostics
type Msg<'T> = 
| AsyncAdd of 'T * AsyncReplyChannel<unit> 
| Add of 'T
| AsyncGet of AsyncReplyChannel<'T>
let sw = Stopwatch()
let mutable scanned = 0
let mutable scanTimeStart = 0L
let createQueue maxLength = MailboxProcessor.Start(fun inbox -> 
let queue = new Queue<'T>()
let rec emptyQueue() = 
inbox.Scan(fun msg ->
match msg with 
| AsyncAdd(value, reply) -> Some(enqueueAndContinueWithReply(value, reply))
| Add(value) -> Some(enqueueAndContinue(value))
| _ -> None )
and fullQueue() =
scanTimeStart <- sw.ElapsedMilliseconds 
inbox.Scan(fun msg ->
scanned <- scanned + 1          
match msg with 
| AsyncGet(reply) ->                
Some(dequeueAndContinue(reply))
| _ -> None )
and runningQueue() = async {
let! msg = inbox.Receive()
scanTimeStart <- sw.ElapsedMilliseconds 
match msg with 
| AsyncAdd(value, reply) -> return! enqueueAndContinueWithReply(value, reply)
| Add(value) -> return! enqueueAndContinue(value)
| AsyncGet(reply) -> return! dequeueAndContinue(reply) }
and enqueueAndContinueWithReply (value, reply) = async {
reply.Reply() 
queue.Enqueue(value)
return! chooseState() }
and enqueueAndContinue (value) = async {
queue.Enqueue(value)
return! chooseState() }
and dequeueAndContinue (reply) = async {
let timestamp = sw.ElapsedMilliseconds
printfn "[AsyncGet] messages cnt/scanned: %d/%d, timestamp/scanTime: %d/%d" inbox.CurrentQueueLength scanned timestamp (timestamp - scanTimeStart)
scanned <- 0
reply.Reply(queue.Dequeue())
return! chooseState() }
and chooseState() = 
if queue.Count = 0 then emptyQueue()
elif queue.Count < maxLength then runningQueue()
else fullQueue()    
emptyQueue())
let mb = createQueue<int option> 500    
let addWithReply v = mb.PostAndAsyncReply(fun ch -> AsyncAdd(v, ch))
let addAndForget v = mb.Post(Add v)
let get() = mb.PostAndAsyncReply(AsyncGet) 

[<EntryPoint>]
let main args = 
sw.Start()
let enqueue() = async { 
for i = 1 to 500 do 
//do! ag.AsyncAdd (Some i) // less than a second with do!
addWithReply (Some i)       // it takes about 20 seconds without do!
//addAndForget(Some i)
//ag.Add (Some i)          // This one takes about 9 seconds
//printfn "=> %d" i 
}
async {
do! [ for i = 1 to 100 do yield enqueue() ] 
|> Async.Parallel |> Async.Ignore
for i = 1 to 5 do addAndForget None
} |> Async.Start
let rec dequeue() =
async {
let! m = get()
match m with
| Some v ->
//printfn "<= %d" v
return! dequeue()
| None -> 
printfn "Done" 
}
[ for i = 1 to 5 do yield dequeue() ] 
|> Async.Parallel |> Async.Ignore |> Async.RunSynchronously
sw.Stop()
printfn "Totally ellapsed: %dms" sw.ElapsedMilliseconds
0

addWithReply 是 AsyncAdd。当我们不做就跑!输出是(其中的一部分):

...
[AsyncGet] messages cnt/scanned: 48453/48450, timestamp/scanTime: 3755/6
[AsyncGet] messages cnt/scanned: 48452/48449, timestamp/scanTime: 3758/3
[AsyncGet] messages cnt/scanned: 48451/48448, timestamp/scanTime: 3761/3
[AsyncGet] messages cnt/scanned: 48450/48447, timestamp/scanTime: 3764/3
...

如您所见,无需 do!,您基本上将所有 50000 个排队请求添加到邮箱的邮件队列中。取消排队线程在这里较慢,并且仅将其请求放在消息的末尾。输出的最后一行指出,邮箱中有 48450 封邮件,项目队列已满(500 个项目),为了释放一个空间,我们需要扫描 48447 条消息 - 因为它们都是 AsyncAdd,而不是 AsyncGet。scanTime 是 2-3ms(在我的机器上) - 来自 MailboxProcessor.Scan 的大致时间。

当我们添加 do! 时,消息队列具有不同的形状(请参阅输出):

[AsyncGet] messages cnt/scanned: 98/96, timestamp/scanTime: 1561/0
[AsyncGet] messages cnt/scanned: 96/96, timestamp/scanTime: 1561/0
[AsyncGet] messages cnt/scanned: 104/96, timestamp/scanTime: 1561/0
[AsyncGet] messages cnt/scanned: 102/96, timestamp/scanTime: 1561/0

消息队列中的消息数 ~ # 个排队线程,因为它们中的每一个现在都在等待。

我还无法从实验中理解的是,当您将 AsyncAdd 更改为添加时,您仍然会向邮箱处理器发送垃圾邮件:

[AsyncGet] messages cnt/scanned: 47551/47548, timestamp/scanTime: 3069/1
[AsyncGet] messages cnt/scanned: 47550/47547, timestamp/scanTime: 3070/1
[AsyncGet] messages cnt/scanned: 47549/47546, timestamp/scanTime: 3073/3
[AsyncGet] messages cnt/scanned: 47548/47545, timestamp/scanTime: 3077/2

但平均扫描时间是~1ms - 比AsyncReplyChannel更快。我的想法 - 这与如何实现 AsyncReplyChannel 有关。它依赖于 ManualResetEvent,因此在内部每个进程可能会有另一个此类事件的队列,并且每个 AsyncGet 应在创建 AsyncReplyChannel 时扫描此队列。

最新更新