移植取消排队 - 将循环从 C# 添加到 F#



我正在尝试移植这个简单的循环:

    var messages = new List<string>();
    while (_MessageQueue.TryDequeue(out var message))
        messages.Add(message);

消息队列并发的位置。它用于将来自多个线程的消息排队的模块中,然后由单个线程处理它们。

是否有一种惯用的 F# 方法来执行取消排队/添加循环?

F#

有几种并发方法,包括对基于代理的编程的良好支持,因此您正在执行的操作的惯用 F# 版本很可能实际上不会使用 conrrent 队列,而是基于代理或其他体系结构。

但是,要回答您有关循环的具体问题 - 由于巧妙地使用了whileout var,C#版本非常简洁。在 F# 中,可以将 TryDequeue 作为与值一起返回bool的方法调用(这样我们就可以避免突变(。我会使用它,以及递归序列表达式:

let mq = System.Collections.Concurrent.ConcurrentQueue<int>()
let rec readAll () = seq {
  let succ, msg = mq.TryDequeue() 
  if succ then
      yield msg
      yield! readAll() }
let messages = readAll() |> List.ofSeq

readAll函数定义一个调用TryDequeue的序列(IEnumerable(,如果操作成功,它将使用yield msg将消息添加到结果中,然后递归尝试使用yield!读取更多消息。

这是一个直接转换:

    open System.Collections.Concurrent
    let _MessageQueue = ConcurrentQueue<string>()
    let messages = ResizeArray<string>()
    let mutable continueLooping = true
    while continueLooping do
        let success, message = _MessageQueue.TryDequeue()
        if success then messages.Add(message)
        continueLooping <- success

虽然这个问题已经有一个公认的答案,但我想使用库函数 Seq.unfold 贡献一个替代实现:

let getAllMessages (mq : _ ConcurrentQueue) =
    mq |> Seq.unfold (fun q ->
        match q.TryDequeue () with
        | true, m -> Some (m, q)
        | _ -> None)
let messages = getAllMessages _MessageQueue |> Seq.toList

不确定它在内部是否与Tomas的解决方案一样复杂(甚至更复杂(,但我发现它简短,易于理解且优雅。

我为咯咯笑提供了几个额外的设计选项。常见部分:

open System.Collections.Concurrent
type Message = { I: int }
let queue = ConcurrentQueue<Message>()

drain1调用队列。GetEnumerator((,它的条件是在初始请求时返回快照。快照基本上与 C# 版本中的争用条件相同。

let drain1 () = queue |> Seq.toList

drain2返回一个数组,再次返回初始请求时的快照。如果您可以更改返回类型。

let drain2 () = queue.ToArray()

这是 TryQueue 惯用返回的一个例子,它避免了"out"参数,因此它不像 C# 那样是一个可变值。

let example () =
    let (success, message) = queue.TryDequeue()
    () // ...

最后,一个递归构建的自终止序列。

let drain3 () =
    let rec drain () = seq {
        let success, message = queue.TryDequeue()
        if success then
            yield message
            yield! drain()
        }
    drain() |> Seq.toList

(标准互联网保修适用。

对哈钦森@Scott回答的调侃

我会定义一个高效、直接的助手来封装突变和循环:-

[<AutoOpen>]
module ConcurrentQueueExtensions =
    type System.Collections.Concurrent.ConcurrentQueue<'T> with
        member this.Drain() =
            let buffer = ResizeArray(this.Count)
            let mutable more = true
            while more do
                match this.TryDequeue() with
                | true, req -> buffer.Add req
                | false, _ -> more <- false
            buffer.ToArray()

甚至将该通用帮助程序保留在 C# 中:

class ConcurrentQueueExtensions
{
    public static T[] Drain<T>(this System.Collections.Concurrent.ConcurrentQueue<T> that)
    {
        var buffer = new List<T>(that.Count);
        while (that.TryDequeue(out var req))
            buffer.Add(req);
        return buffer.ToArray();
    }
}

然后在不混合范式的情况下应用它:

let messages = queue.Drain()

最新更新