编译的控制台命令行程序不会等待所有线程完成



如果将代码编译为控制台程序或以fsi——use: program方式运行,则一些线程将在完成之前终止。f——exec——quiet。有没有办法等待所有线程结束?

这个问题可以描述为"当存在多个mailboxprocessor 时程序退出问题"。

输出示例

(注意最后一行被截断,最后一个输出函数(printfn "[Main] after crawl")永远不会执行)

<>之前[主]前爬行[爬行]在返回结果之前http://news.google.com由代理1爬行。[主管]达到极限Agent 5完成。http://www.gstatic.com/news/img/favicon.ico由代理1爬行。[主管]达到极限Agent 1完成。http://www.google.com/imghp?hl=en&tab=ni由代理4爬行。[主管]达到极限Agent 4完成。http://www.google.com/webhp?hl=en&tab=nw由代理2爬行。[主管]达到极限Agent 2完成。http://news.google。之前

代码

编辑:添加几个System.Threading.Thread.CurrentThread.IsBackground <- false

open System
open System.Collections.Concurrent
open System.Collections.Generic
open System.IO
open System.Net
open System.Text.RegularExpressions
module Helpers =
    type Message =
        | Done
        | Mailbox of MailboxProcessor<Message>
        | Stop
        | Url of string option
        | Start of AsyncReplyChannel<unit>
    // Gates the number of crawling agents.
    [<Literal>]
    let Gate = 5
    // Extracts links from HTML.
    let extractLinks html =
        let pattern1 = "(?i)href\s*=\s*("|')/?((?!#.*|/B|" + 
                       "mailto:|location.|javascript:)[^"']+)("|')"
        let pattern2 = "(?i)^https?"
        let links =
            [
                for x in Regex(pattern1).Matches(html) do
                    yield x.Groups.[2].Value
            ]
            |> List.filter (fun x -> Regex(pattern2).IsMatch(x))
        links
    // Fetches a Web page.
    let fetch (url : string) =
        try
            let req = WebRequest.Create(url) :?> HttpWebRequest
            req.UserAgent <- "Mozilla/5.0 (Windows; U; MSIE 9.0; Windows NT 9.0; en-US)"
            req.Timeout <- 5000
            use resp = req.GetResponse()
            let content = resp.ContentType
            let isHtml = Regex("html").IsMatch(content)
            match isHtml with
            | true -> use stream = resp.GetResponseStream()
                      use reader = new StreamReader(stream)
                      let html = reader.ReadToEnd()
                      Some html
            | false -> None
        with
        | _ -> None
    let collectLinks url =
        let html = fetch url
        match html with
        | Some x -> extractLinks x
        | None -> []
open Helpers
// Creates a mailbox that synchronizes printing to the console (so 
// that two calls to 'printfn' do not interleave when printing)
let printer = 
    MailboxProcessor.Start(fun x -> async {
        while true do 
        let! str = x.Receive()
        System.Threading.Thread.CurrentThread.IsBackground <- false
        printfn "%s" str })
// Hides standard 'printfn' function (formats the string using 
// 'kprintf' and then posts the result to the printer agent.
let printfn fmt = 
    Printf.kprintf printer.Post fmt
let crawl url limit = 
    // Concurrent queue for saving collected urls.
    let q = ConcurrentQueue<string>()
    // Holds crawled URLs.
    let set = HashSet<string>()

    let supervisor =
        MailboxProcessor.Start(fun x -> async {
            System.Threading.Thread.CurrentThread.IsBackground <- false
            // The agent expects to receive 'Start' message first - the message
            // carries a reply channel that is used to notify the caller
            // when the agent completes crawling.
            let! start = x.Receive()
            let repl =
              match start with
              | Start repl -> repl
              | _ -> failwith "Expected Start message!"
            let rec loop run =
                async {
                    let! msg = x.Receive()
                    match msg with
                    | Mailbox(mailbox) -> 
                        let count = set.Count
                        if count < limit - 1 && run then 
                            let url = q.TryDequeue()
                            match url with
                            | true, str -> if not (set.Contains str) then
                                                let set'= set.Add str
                                                mailbox.Post <| Url(Some str)
                                                return! loop run
                                            else
                                                mailbox.Post <| Url None
                                                return! loop run
                            | _ -> mailbox.Post <| Url None
                                   return! loop run
                        else
                            printfn "[supervisor] reached limit" 
                            // Wait for finishing
                            mailbox.Post Stop
                            return! loop run
                    | Stop -> printfn "[Supervisor] stop"; return! loop false
                    | Start _ -> failwith "Unexpected start message!"
                    | Url _ -> failwith "Unexpected URL message!"
                    | Done -> printfn "[Supervisor] Supervisor is done."
                              (x :> IDisposable).Dispose()
                              // Notify the caller that the agent has completed
                              repl.Reply(())
                }
            do! loop true })

    let urlCollector =
        MailboxProcessor.Start(fun y ->
            let rec loop count =
                async {
                    System.Threading.Thread.CurrentThread.IsBackground <- false
                    let! msg = y.TryReceive(6000)
                    match msg with
                    | Some message ->
                        match message with
                        | Url u ->
                            match u with
                            | Some url -> q.Enqueue url
                                          return! loop count
                            | None -> return! loop count
                        | _ ->
                            match count with
                            | Gate -> (y :> IDisposable).Dispose()
                                      printfn "[urlCollector] URL collector is done."
                                      supervisor.Post Done
                            | _ -> return! loop (count + 1)
                    | None -> supervisor.Post Stop
                              return! loop count
                }
            loop 1)
    /// Initializes a crawling agent.
    let crawler id =
        MailboxProcessor.Start(fun inbox ->
            let rec loop() =
                async {
                    System.Threading.Thread.CurrentThread.IsBackground <- false
                    let! msg = inbox.Receive()
                    match msg with
                    | Url x ->
                        match x with
                        | Some url -> 
                                let links = collectLinks url
                                printfn "%s crawled by agent %d." url id
                                for link in links do
                                    urlCollector.Post <| Url (Some link)
                                supervisor.Post(Mailbox(inbox))
                                return! loop()
                        | None -> supervisor.Post(Mailbox(inbox))
                                  return! loop()
                    | _ -> printfn "Agent %d is done." id
                           urlCollector.Post Done
                           (inbox :> IDisposable).Dispose()
                    }
            loop())
    // Send 'Start' message to the main agent. The result
    // is asynchronous workflow that will complete when the
    // agent crawling completes
    let result = supervisor.PostAndAsyncReply(Start)
    // Spawn the crawlers.
    let crawlers = 
        [
            for i in 1 .. Gate do
                yield crawler i
        ]
    // Post the first messages.
    crawlers.Head.Post <| Url (Some url)
    crawlers.Tail |> List.iter (fun ag -> ag.Post <| Url None) 
    printfn "[Crawl] before return result"
    result
// Example:
printfn "[Main] before crawl"
crawl "http://news.google.com" 5
|> Async.RunSynchronously
printfn "[Main] after crawl"

如果我正确识别代码,它是基于您之前的问题(和我的答案)。

程序等待主管代理完成(先发送Start消息,然后等待RunSynchronously回复)。这将保证主代理和所有爬虫在应用程序退出之前完成。

问题是它不会等到printer代理完成!因此,对(重新定义的)printfn函数的最后一次调用向代理发送一条消息,然后应用程序完成,而无需等待打印代理完成。

据我所知,等待agent完成队列中当前所有消息的处理并没有"标准模式"。你可以试试以下这些方法:

  • 您可以检查CurrentQueueLength属性(等待直到它为0),但这仍然不意味着代理完成了处理所有消息。

  • 您可以通过添加新类型的消息并等待直到代理回复该消息(就像您当前正在等待对Start消息的回复)来使代理更复杂。

注意,我不知道f#,但通常使用Thread.Join等待所有感兴趣的线程。在我看来,在您的情况下,您需要等待通过调用.Start启动的任何感兴趣的内容。

你也可以考虑任务并行库,它给你一个更高层次(更简单)的抽象到原始托管线程。等待任务完成的示例。

。. NET线程具有Thread属性。IsBackground设置为true时,线程不会阻止进程退出。当设置为false时,它将阻止进程退出。参见:http://msdn.microsoft.com/en-us/library/system.threading.thread.isbackground.aspx

运行代理的线程来自线程池,因此具有thread。IsBackground默认设置为false。

你可以尝试在每次读取消息时将线程的IsBackground设置为false。您可以添加一个函数来完成此工作,从而使该方法更简洁。这可能不是问题的最佳解决方案,因为每次您都使用let!您可以更改线程,因此需要仔细实现才能正常工作。我只是想提一下来回答这个具体问题

是否有办法等待所有线程结束?

并帮助人们理解为什么某些线程阻止程序退出,而其他线程没有。

我想我已经解决了这个问题:在打印机代理的let!之后添加System.Threading.Thread.CurrentThread.IsBackground <- false

然而,我试图修改原始代码(Tomas的AsyncChannel修复之前的第一个版本),在所有let!之后添加System.Threading.Thread.CurrentThread.IsBackground <- false,它仍然不起作用。不知道。

谢谢大家的帮助。我终于可以开始我的第一个f#批处理应用程序。我认为MailboxProcessor应该将IsBackground设置为false默认值。无论如何,请微软更改它。

[Update]刚刚发现编译后的程序集运行良好。但是fsi --user:Program --exec --quiet还是一样的。这似乎是一个错误。

相关内容

  • 没有找到相关文章

最新更新