我正在尝试移植这个简单的循环:
var messages = new List<string>();
while (_MessageQueue.TryDequeue(out var message))
messages.Add(message);
消息队列并发的位置。它用于将来自多个线程的消息排队的模块中,然后由单个线程处理它们。
是否有一种惯用的 F# 方法来执行取消排队/添加循环?
有几种并发方法,包括对基于代理的编程的良好支持,因此您正在执行的操作的惯用 F# 版本很可能实际上不会使用 conrrent 队列,而是基于代理或其他体系结构。
但是,要回答您有关循环的具体问题 - 由于巧妙地使用了while
和out var
,C#版本非常简洁。在 F# 中,可以将 TryDequeue
作为与值一起返回bool
的方法调用(这样我们就可以避免突变(。我会使用它,以及递归序列表达式:
let mq = System.Collections.Concurrent.ConcurrentQueue<int>()
let rec readAll () = seq {
let succ, msg = mq.TryDequeue()
if succ then
yield msg
yield! readAll() }
let messages = readAll() |> List.ofSeq
readAll
函数定义一个调用TryDequeue
的序列(IEnumerable
(,如果操作成功,它将使用yield msg
将消息添加到结果中,然后递归尝试使用yield!
读取更多消息。
这是一个直接转换:
open System.Collections.Concurrent
let _MessageQueue = ConcurrentQueue<string>()
let messages = ResizeArray<string>()
let mutable continueLooping = true
while continueLooping do
let success, message = _MessageQueue.TryDequeue()
if success then messages.Add(message)
continueLooping <- success
虽然这个问题已经有一个公认的答案,但我想使用库函数 Seq.unfold
贡献一个替代实现:
let getAllMessages (mq : _ ConcurrentQueue) =
mq |> Seq.unfold (fun q ->
match q.TryDequeue () with
| true, m -> Some (m, q)
| _ -> None)
let messages = getAllMessages _MessageQueue |> Seq.toList
不确定它在内部是否与Tomas的解决方案一样复杂(甚至更复杂(,但我发现它简短,易于理解且优雅。
我为咯咯笑提供了几个额外的设计选项。常见部分:
open System.Collections.Concurrent
type Message = { I: int }
let queue = ConcurrentQueue<Message>()
drain1
调用队列。GetEnumerator((,它的条件是在初始请求时返回快照。快照基本上与 C# 版本中的争用条件相同。
let drain1 () = queue |> Seq.toList
drain2
返回一个数组,再次返回初始请求时的快照。如果您可以更改返回类型。
let drain2 () = queue.ToArray()
这是 TryQueue 惯用返回的一个例子,它避免了"out"参数,因此它不像 C# 那样是一个可变值。
let example () =
let (success, message) = queue.TryDequeue()
() // ...
最后,一个递归构建的自终止序列。
let drain3 () =
let rec drain () = seq {
let success, message = queue.TryDequeue()
if success then
yield message
yield! drain()
}
drain() |> Seq.toList
(标准互联网保修适用。
我会定义一个高效、直接的助手来封装突变和循环:-
[<AutoOpen>]
module ConcurrentQueueExtensions =
type System.Collections.Concurrent.ConcurrentQueue<'T> with
member this.Drain() =
let buffer = ResizeArray(this.Count)
let mutable more = true
while more do
match this.TryDequeue() with
| true, req -> buffer.Add req
| false, _ -> more <- false
buffer.ToArray()
甚至将该通用帮助程序保留在 C# 中:
class ConcurrentQueueExtensions
{
public static T[] Drain<T>(this System.Collections.Concurrent.ConcurrentQueue<T> that)
{
var buffer = new List<T>(that.Count);
while (that.TryDequeue(out var req))
buffer.Add(req);
return buffer.ToArray();
}
}
然后在不混合范式的情况下应用它:
let messages = queue.Drain()