F# 事件在异步工作流中不起作用



我想对代理进行一次 Post-Fire-Reply 。基本上,代理会触发一个事件,然后回复呼叫者。但是,我要么不断收到超时错误,要么事件无法正确触发。我尝试执行 Post-Fire,这停止了超时错误,但事件不会触发。

let evt = new Event<int>()
let stream = evt.Publish
type Agent<'T> = MailboxProcessor<'T>
type Fire = Fire of int
let agent = Agent.Start(fun inbox -> 
let rec loop() = async {
let! msg = inbox.Receive()
let (Fire i) = msg
evt.Trigger i }
loop())
let on i fn = 
stream 
|> Observable.filter (fun x -> x = i) 
|> Observable.filter (fun x -> x <> 1)
|> Observable.subscribe (fun x -> fn x) 
let rec collatz n =
printfn "%d" n
on n (fun i -> 
if (i % 2 = 0) then collatz (i/2)
else collatz (3*n + 1)) |> ignore
agent.Post (Fire n) // this does not work
// evt.Trigger n // this does works

collatz 13    

这是一个简单的实验,它重复创建一个函数来查找 Collatz 系列中的下一个数字,然后调用自身以返回该值,直到它达到 1。

似乎发生的事情是触发器只触发一次。我尝试尝试了我能想到的Async.RunSyncly/Async.Start/StartChild/SynchronizationContext的所有组合,但没有进展。我找到了一个类似于我正在做的博客,但这也没有帮助我

编辑谢谢费奥多尔·索金指出我的疏忽。最初的问题仍然存在,因为我希望同时触发事件并回复结果,但超时。

let evt = new Event<int>()
let stream = evt.Publish
type Agent<'T> = MailboxProcessor<'T>
type Command = 
| Fire of int
| Get of int * AsyncReplyChannel<int>
let agent = Agent.Start(fun inbox -> 
let rec loop() = async {
let! msg = inbox.Receive()
match msg with
| Fire i -> evt.Trigger i 
| Get (i,ch) -> 
evt.Trigger i 
ch.Reply(i)
return! loop() }
loop())
let on i fn = 
stream 
|> Observable.filter (fun x -> x = i) 
|> Observable.filter (fun x -> x <> 1)
|> Observable.subscribe (fun x -> fn x) 
let rec collatz n =
printfn "%d" n
on n (fun i -> 
if (i % 2 = 0) then collatz (i/2)
else collatz (3*n + 1)) |> ignore
agent.PostAndReply (fun ch -> (Get (n, ch))) |> ignore // timeout
agent.PostAndAsyncReply (fun ch -> (Get (n, ch))) |> Async.Ignore |> Async.Start // works but I need the result
agent.PostAndAsyncReply (fun ch -> (Get (n, ch))) |> Async.RunSynchronously |> ignore // timeout
collatz 13

您的loop函数不会循环。它接收第一条消息,触发事件,然后只是...出口。从不尝试接收第二条消息。

你需要使这个函数连续工作:处理第一条消息,然后立即返回以接收下一条消息,然后接收下一条消息,依此类推。喜欢这个:

let agent = Agent.Start(fun inbox -> 
let rec loop() = async {
let! msg = inbox.Receive()
let (Fire i) = msg
evt.Trigger i
return! loop() }
loop())
<小时 />

编辑

既然你已经达到了问题的限制,我将在这里回答你的编辑。

您在第二个代码段中超时的原因是代码中存在死锁。让我们跟踪执行以查看。

  1. 线程 1:代理已启动。
  2. 线程 2:第一个collatz调用。
  3. 线程 2:第一个collatz呼叫向座席发布消息。
  4. 线程 1:代理接收消息。
  5. 线程 1:代理触发事件。
  6. 线程 1:作为事件的结果,发生第二个collatz调用。
  7. 线程 1:第二个collatz呼叫向代理发布消息。
  8. 线程 1:第二个collatz呼叫开始等待代理响应。

这就是执行结束的地方。代理此时无法响应(事实上,它甚至无法接收下一条消息!),因为它的指令指针仍在evt.Trigger内。evt.Trigger调用尚未返回,因此loop函数尚未递归,因此尚未调用inbox.Receive函数,因此第二条消息仍在代理队列中等待。

因此,您会遇到一个经典的死锁:collatz正在等待代理接收其消息,但代理正在等待collatz完成事件处理。

对此最简单、最愚蠢的解决方案是异步触发事件:

async { evt.Trigger i } |> Async.Start

这将确保事件处理程序不是"就在那里"执行,而是异步执行,可能在不同的线程上。这反过来又允许代理在继续自己的执行循环之前不必等待事件被处理。

一般来说,在处理多线程和异步时,永远不应该直接调用未知代码。代理永远不应该直接调用evt.Trigger或它无法控制的任何其他内容,因为该代码可能正在等待代理本身(这就是您的情况),从而引入死锁。

相关内容

  • 没有找到相关文章

最新更新