以下代码运行大约需要 20 秒。但是,在取消注释后不到一秒钟do!
.为什么会有如此巨大的差异?
更新:使用ag.Add
需要9秒。我已经更新了代码。
open FSharpx.Control
let test () =
let ag = new BlockingQueueAgent<int option>(500)
let enqueue() = async {
for i = 1 to 500 do
//do! ag.AsyncAdd (Some i) // less than a second with do!
ag.AsyncAdd (Some i) // it takes about 20 seconds without do!
//ag.Add (Some i) // This one takes about 9 seconds
//printfn "=> %d" i
}
async {
do! [ for i = 1 to 100 do yield enqueue() ]
|> Async.Parallel |> Async.Ignore
for i = 1 to 5 do ag.Add None
} |> Async.Start
let rec dequeue() =
async {
let! m = ag.AsyncGet()
match m with
| Some v ->
//printfn "<= %d" v
return! dequeue()
| None ->
printfn "Done"
}
[ for i = 1 to 5 do yield dequeue() ]
|> Async.Parallel |> Async.Ignore |> Async.RunSynchronously
0
如果没有do!
,您就不会等待AsyncAdd
的结果。这意味着您将尽可能快地启动 500 次AsyncAdd
操作,每次调用enqueue()
。尽管如果队列已满,每个AsyncAdd
调用都会阻塞,但如果您不等待AsyncAdd
的结果,那么您的enqueue()
代码不会被阻止,并且它将继续启动新的AsyncAdd
操作。
由于您并行启动 100 个enqueue()
操作,因此可能会有多达 5 万个AsyncAdd
操作尝试同时运行,这意味着线程池正在处理 49,500 个阻塞线程。这是对你的系统的需求很大。实际上,您不会同时并行启动 100 个enqueue()
操作,但您将启动与逻辑 CPU 一样多的enqueue()
操作。对于这个答案的其余部分,我将假设你有一个具有超线程的四核处理器(因为你的F# Async.Parallel |> Async.RunSyncly只使用八个CPU内核中的一个?问题似乎暗示),所以这是8个逻辑CPU,所以你将在任何事情阻塞之前启动enqueue()
的八个副本,这意味着你将有4,000个AsyncAdd
线程运行, 其中3,500个将被阻止。
另一方面,当您使用do!
时,如果AsyncAdd
被阻止,您的enqueue()
操作也将阻塞,直到队列中有空位打开。因此,一旦队列中有 500 个项目,线程池中将有 8 个阻塞的AsyncAdd
线程(八个逻辑 CPU 上运行的八个enqueue()
操作中的每一个,而不是 (8*500 - 500 = 3500) 阻塞的线程AsyncAdd
线程)。八个阻塞的线程而不是 3,500 个意味着线程池不会进行 3,500 个分配,使用更少的 RAM 和更少的 CPU 时间来处理所有这些线程。
正如我在回答您之前的问题时所说,您似乎确实需要对异步操作有更深入的了解。除了我在该答案中链接到的文章(本文和本系列)之外,我还建议您阅读 https://medium.com/jettech/f-async-guide-eb3c8a2d180a,这是关于 F# 异步操作和您可能遇到的一些"陷阱"的相当长且详细的指南。我强烈建议你去阅读这些文章,然后回过头来再次查看您的问题。随着您从阅读这些文章中获得的更深入理解,您也许能够回答自己的问题!
继续这个问题。下面是基于代码的实验:
// Learn more about F# at http://fsharp.org
module Test.T1
open System
open System.Collections.Generic
open System.Diagnostics
type Msg<'T> =
| AsyncAdd of 'T * AsyncReplyChannel<unit>
| Add of 'T
| AsyncGet of AsyncReplyChannel<'T>
let sw = Stopwatch()
let mutable scanned = 0
let mutable scanTimeStart = 0L
let createQueue maxLength = MailboxProcessor.Start(fun inbox ->
let queue = new Queue<'T>()
let rec emptyQueue() =
inbox.Scan(fun msg ->
match msg with
| AsyncAdd(value, reply) -> Some(enqueueAndContinueWithReply(value, reply))
| Add(value) -> Some(enqueueAndContinue(value))
| _ -> None )
and fullQueue() =
scanTimeStart <- sw.ElapsedMilliseconds
inbox.Scan(fun msg ->
scanned <- scanned + 1
match msg with
| AsyncGet(reply) ->
Some(dequeueAndContinue(reply))
| _ -> None )
and runningQueue() = async {
let! msg = inbox.Receive()
scanTimeStart <- sw.ElapsedMilliseconds
match msg with
| AsyncAdd(value, reply) -> return! enqueueAndContinueWithReply(value, reply)
| Add(value) -> return! enqueueAndContinue(value)
| AsyncGet(reply) -> return! dequeueAndContinue(reply) }
and enqueueAndContinueWithReply (value, reply) = async {
reply.Reply()
queue.Enqueue(value)
return! chooseState() }
and enqueueAndContinue (value) = async {
queue.Enqueue(value)
return! chooseState() }
and dequeueAndContinue (reply) = async {
let timestamp = sw.ElapsedMilliseconds
printfn "[AsyncGet] messages cnt/scanned: %d/%d, timestamp/scanTime: %d/%d" inbox.CurrentQueueLength scanned timestamp (timestamp - scanTimeStart)
scanned <- 0
reply.Reply(queue.Dequeue())
return! chooseState() }
and chooseState() =
if queue.Count = 0 then emptyQueue()
elif queue.Count < maxLength then runningQueue()
else fullQueue()
emptyQueue())
let mb = createQueue<int option> 500
let addWithReply v = mb.PostAndAsyncReply(fun ch -> AsyncAdd(v, ch))
let addAndForget v = mb.Post(Add v)
let get() = mb.PostAndAsyncReply(AsyncGet)
[<EntryPoint>]
let main args =
sw.Start()
let enqueue() = async {
for i = 1 to 500 do
//do! ag.AsyncAdd (Some i) // less than a second with do!
addWithReply (Some i) // it takes about 20 seconds without do!
//addAndForget(Some i)
//ag.Add (Some i) // This one takes about 9 seconds
//printfn "=> %d" i
}
async {
do! [ for i = 1 to 100 do yield enqueue() ]
|> Async.Parallel |> Async.Ignore
for i = 1 to 5 do addAndForget None
} |> Async.Start
let rec dequeue() =
async {
let! m = get()
match m with
| Some v ->
//printfn "<= %d" v
return! dequeue()
| None ->
printfn "Done"
}
[ for i = 1 to 5 do yield dequeue() ]
|> Async.Parallel |> Async.Ignore |> Async.RunSynchronously
sw.Stop()
printfn "Totally ellapsed: %dms" sw.ElapsedMilliseconds
0
addWithReply 是 AsyncAdd。当我们不做就跑!输出是(其中的一部分):
...
[AsyncGet] messages cnt/scanned: 48453/48450, timestamp/scanTime: 3755/6
[AsyncGet] messages cnt/scanned: 48452/48449, timestamp/scanTime: 3758/3
[AsyncGet] messages cnt/scanned: 48451/48448, timestamp/scanTime: 3761/3
[AsyncGet] messages cnt/scanned: 48450/48447, timestamp/scanTime: 3764/3
...
如您所见,无需 do!,您基本上将所有 50000 个排队请求添加到邮箱的邮件队列中。取消排队线程在这里较慢,并且仅将其请求放在消息的末尾。输出的最后一行指出,邮箱中有 48450 封邮件,项目队列已满(500 个项目),为了释放一个空间,我们需要扫描 48447 条消息 - 因为它们都是 AsyncAdd,而不是 AsyncGet。scanTime 是 2-3ms(在我的机器上) - 来自 MailboxProcessor.Scan 的大致时间。
当我们添加 do! 时,消息队列具有不同的形状(请参阅输出):
[AsyncGet] messages cnt/scanned: 98/96, timestamp/scanTime: 1561/0
[AsyncGet] messages cnt/scanned: 96/96, timestamp/scanTime: 1561/0
[AsyncGet] messages cnt/scanned: 104/96, timestamp/scanTime: 1561/0
[AsyncGet] messages cnt/scanned: 102/96, timestamp/scanTime: 1561/0
消息队列中的消息数 ~ # 个排队线程,因为它们中的每一个现在都在等待。
我还无法从实验中理解的是,当您将 AsyncAdd 更改为添加时,您仍然会向邮箱处理器发送垃圾邮件:
[AsyncGet] messages cnt/scanned: 47551/47548, timestamp/scanTime: 3069/1
[AsyncGet] messages cnt/scanned: 47550/47547, timestamp/scanTime: 3070/1
[AsyncGet] messages cnt/scanned: 47549/47546, timestamp/scanTime: 3073/3
[AsyncGet] messages cnt/scanned: 47548/47545, timestamp/scanTime: 3077/2
但平均扫描时间是~1ms - 比AsyncReplyChannel更快。我的想法 - 这与如何实现 AsyncReplyChannel 有关。它依赖于 ManualResetEvent,因此在内部每个进程可能会有另一个此类事件的队列,并且每个 AsyncGet 应在创建 AsyncReplyChannel 时扫描此队列。