以节流队列的方式返回到呼叫者



在片段上构建并回答,是否可以从节流队列中将结果返回给呼叫者?我已经尝试过后汇A,以在频道上收到答复,但是如果我用顾问将其丢弃,则会引发错误。这是代码。

感谢围绕队列或邮箱设计模式的基于F#核心香草的解决方案。

问题

问题是能够基于油门(一次最大3(,从数组中传递每个项目,等待整个队列/数组,直到收集所有结果时,然后返回结果给呼叫者。(在此处将结果归还给呼叫者是待定的(

callee代码

// Message type used by the agent - contains queueing
// of work items and notification of completion
type ThrottlingAgentMessage =
  | Completed
  | Enqueue of Async<unit>
/// Represents an agent that runs operations in concurrently. When the number
/// of concurrent operations exceeds 'limit', they are queued and processed later
let throttlingAgent limit =
    MailboxProcessor.Start(fun inbox ->
    async {
      // The agent body is not executing in parallel,
      // so we can safely use mutable queue & counter
      let queue = System.Collections.Generic.Queue<Async<unit>>()
      let running = ref 0
      while true do
        // Enqueue new work items or decrement the counter
        // of how many tasks are running in the background
        let! msg = inbox.Receive()
        match msg with
        | Completed -> decr running
        | Enqueue w -> queue.Enqueue(w)
        // If we have less than limit & there is some work to
        // do, then start the work in the background!
        while running.Value < limit && queue.Count > 0 do
          let work = queue.Dequeue()
          incr running
          do! // When the work completes, send 'Completed'
              // back to the agent to free a slot
              async {
                do! work
                inbox.Post(Completed)
              }
              |> Async.StartChild
              |> Async.Ignore
    })

let requestDetailAsync (url: string) : Async<Result<string, Error>> =
     async {
       Console.WriteLine ("Simulating request " + url)
       try
           do! Async.Sleep(1000) // let's say each request takes about a second
           return Ok (url + ":body...")
       with :? WebException as e ->
           return Error {Code = "500"; Message = "Internal Server Error"; Status = HttpStatusCode.InternalServerError}
     }
let requestMasterAsync() : Async<Result<System.Collections.Concurrent.ConcurrentBag<_>, Error>> =
    async {
        let urls = [|
                    "http://www.example.com/1";
                    "http://www.example.com/2";
                    "http://www.example.com/3";
                    "http://www.example.com/4";
                    "http://www.example.com/5";
                    "http://www.example.com/6";
                    "http://www.example.com/7";
                    "http://www.example.com/8";
                    "http://www.example.com/9";
                    "http://www.example.com/10";
                |]
        let results = System.Collections.Concurrent.ConcurrentBag<_>()
        let agent = throttlingAgent 3
        for url in urls do
            async {
                let! res = requestDetailAsync url
                results.Add res
            }
            |> Enqueue
            |> agent.Post
        return Ok results
    }

呼叫者代码

[<TestMethod>]
member this.TestRequestMasterAsync() =
    match Entity.requestMasterAsync() |> Async.RunSynchronously with
    | Ok result -> Console.WriteLine result
    | Error error -> Console.WriteLine error

您可以使用Hopac.Streams。使用这样的工具,这很微不足道:

open Hopac
open Hopac.Stream
open System
let requestDetailAsync url = async {
   Console.WriteLine ("Simulating request " + url)
   try
       do! Async.Sleep(1000) // let's say each request takes about a second
       return Ok (url + ":body...")
   with :? Exception as e ->
       return Error e
 }
let requestMasterAsync() : Stream<Result<string,exn>> =
    [| "http://www.example.com/1"
       "http://www.example.com/2"
       "http://www.example.com/3"
       "http://www.example.com/4"
       "http://www.example.com/5"
       "http://www.example.com/6"
       "http://www.example.com/7"
       "http://www.example.com/8"
       "http://www.example.com/9"
       "http://www.example.com/10" |]
    |> Stream.ofSeq
    |> Stream.mapPipelinedJob 3 (requestDetailAsync >> Job.fromAsync)
requestMasterAsync()
|> Stream.iterFun (printfn "%A")
|> queue //prints all results asynchronously
let allResults : Result<string,exn> list = 
    requestMasterAsync()
    |> Stream.foldFun (fun results cur -> cur::results ) []
    |> run //fold stream into list synchronously

添加如果您只想使用邮箱使用香草FSharp.Core,则只能尝试以下操作:

type ThrottlingAgentMessage =
  | Completed
  | Enqueue of Async<unit>
let inline (>>=) x f = async.Bind(x, f)
let inline (>>-) x f = async.Bind(x, f >> async.Return)
let throttlingAgent limit =
    let agent = MailboxProcessor.Start(fun inbox ->
        let queue = System.Collections.Generic.Queue<Async<unit>>()
        let startWork work = 
            work
            >>- fun _ -> inbox.Post Completed
            |> Async.StartChild |> Async.Ignore
        let rec loop curWorkers =
            inbox.Receive()
            >>= function
            | Completed when queue.Count > 0 -> 
                queue.Dequeue() |> startWork
                >>= fun _ -> loop curWorkers
            | Completed -> 
                loop (curWorkers - 1)
            | Enqueue w when curWorkers < limit ->
                w |> startWork
                >>= fun _ -> loop (curWorkers + 1)
            | Enqueue w ->
                queue.Enqueue w
                loop curWorkers
        loop 0)
    Enqueue >> agent.Post

它几乎是相同的逻辑,但是如果有自由工人的能力(只需开始工作,并且不要打扰队列/Dequeue(,则不使用队列略有优化。

throttlingAgent是函数 int -> Async<unit> -> unit因为我们不希望客户打扰我们的内部ThrottlingAgentMessage类型。

这样使用:

let throttler = throttlingAgent 3
for url in urls do
    async {
        let! res = requestDetailAsync url
        results.Add res
    }
    |> throttler

相关内容

  • 没有找到相关文章

最新更新