将侦听器事件添加到 ConcurrentQueue 或 ConcurrentBag?



我有多个任务以 1:1 的方式从队列中获取消息。我想将这些来自每个线程的消息添加到 ConcurrentBag 中,并在它们异步传入时处理它们。这里的目的是尽快将消息从队列中取出,这样队列就不会填满。我只需要一个侦听器的帮助,该侦听器会等到消息被添加到 ConcurrentBag 中,然后我需要将消息从 Bag 中删除并处理它们

private static ConcurrentQueue<string> messageList = new ConcurrentQueue<string>();
private static readonly SemaphoreSlim semaphore = new SemaphoreSlim(50);
void Main (string[] args)
{
List<Task> taskList = new TaskList();
foreach(var job in JobList)
{
taskList.Add(Task.Run(() => ListenToQueue(job.QueueName));
}
Task.WaitAll(taskList.ToArray());
}

private async Task<string> ListenToQueue(string queueName)
{
var cancellationtoken = new CancellationTokenSource(TimeSpan.FromMinutes(60)).Token

//possibly 5000 messages can be on a single queue 
while(!cancellationtoken.IsCancellationRequested)
{
var message = getMessageFromQueue(queueName);
messageList.Enqueue(message); //Add the message from each thread to a thread safe List
}
}

我在这里需要一个侦听器事件,每次将某些内容添加到列表中时,都会触发此事件。此外,我还需要以线程安全的方式从列表中删除消息。

private void Listener()
{
var msg =  string.Empty;
while (messageList.Count > 0)
{
messageList.TryDequeue(out msg)
await semaphore.WaitAsync();
Task.Run(() => 
{ 
try
{
if(!String.IsNullorEmpty(msg))
{
_ = ProcessMessage(msg); // I do not want to await anything but just fire and let it go
}
}
finally
{
sim.Release();
}
});
}    
}

这些天,我建议使用异步兼容的解决方案,例如System.Threading.Channels

private static Channel<string> messageList = Channel.CreateUnbounded<string>();
private async Task<string> ListenToQueue(string queueName)
{
var cancellationtoken = new CancellationTokenSource(TimeSpan.FromMinutes(60)).Token;
try
{
var message = await getMessageFromQueue(queueName, cancellationtoken);
await messageList.Writer.WriteAsync(message, cancellationtoken);
}
catch (OperationCanceledException)
{
// ignored
}
}
private async Task Listener()
{
await foreach (var msg in messageList.Reader.ReadAllAsync())
{
if (!string.IsNullOrEmpty(msg))
_ = Task.Run(() => ProcessMessage(msg));
}
}

但是,如果你想(或需要)留在阻塞世界中,那里也有一个解决方案。ConcurrentBag<T>ConcurrentQueue<T>很少直接使用。相反,更常见的是使用BlockingCollection<T>,它包装并发集合并提供更高级别的 API,包括GetConsumingEnumerable

private static BlockingCollection<string> messageList = new();
private async Task<string> ListenToQueue(string queueName)
{
var cancellationtoken = new CancellationTokenSource(TimeSpan.FromMinutes(60)).Token;
try
{
var message = await getMessageFromQueue(queueName, cancellationtoken);
messageList.Add(message, cancellationtoken);
}
catch (OperationCanceledException)
{
// ignored
}
}
private void Listener()
{
foreach (var msg in messageList.GetConsumingEnumerable())
{
if (!string.IsNullOrEmpty(msg))
_ = Task.Run(() => ProcessMessage(msg));
}
}

相关内容

  • 没有找到相关文章

最新更新