消费者/生产者,具有订单和对已消费项目的限制



我有以下场景

  • 我正在编写一个处理文件(作业)的服务器
    • 文件有一个"前缀"和一个时间
    • 文件应该根据时间处理(先处理旧文件),但也要考虑前缀(具有相同前缀的文件不能同时处理)
  • 我有一个线程(带定时器的任务),它监视目录并将文件添加到"队列"(生产者)
  • 我有几个从"队列"(consumer)获取文件的consumer,它们应该符合上面的规则。
    • 每个任务的作业都保存在某个列表中(这表示限制)
  • 有几个消费者,消费者的数量在启动时确定

其中一个要求是能够优雅地停止消费者(立即或让正在进行的流程完成)。

我沿着这条线做了一些事情:

while (processing)
{
//limits number of concurrent tasks
_processingSemaphore.Wait(queueCancellationToken);  
//Take next job when available or wait for cancel signal
currentwork = workQueue.Take(taskCancellationToken);
//check that it can actually process this work
if (CanProcess(currnetWork)
{ 
var task = CreateTask(currentwork)
task.ContinueWith((t) => { //release processing slot });
}
else
//release slot, return job? something else?
}

取消令牌源位于调用方代码中,可以取消。有两种方法可以在不取消正在运行的任务的情况下停止排队。

我厌倦了将"队列"实现为BlockingCollection,包装一个"安全"的SortedSet。一般的想法是有效的(按时间排序),除了我需要找到一个符合约束的新工作的情况。如果我把工作放回队列,并尝试再次接受,我会得到同样的工作。

可以从队列中获取作业,直到我找到合适的作业,然后将"非法"作业返回,但这可能会导致其他消费者在处理无序作业时出现问题

另一种选择是传递一个简单的集合和一种锁定它的方法,然后根据当前约束进行锁定和简单搜索。同样,这意味着编写的代码可能不是线程安全的。

还有其他可以帮助的建议/指针/数据结构吗?

我认为Hans是对的:如果你已经有了一个线程安全的SortedSet(它实现了IProducerConsumerCollection,所以它可以在BlockingCollection中使用),那么你只需要把现在可以处理的文件放进集合中。如果您完成了一个使另一个文件可供处理的文件,请在此时而不是更早地将其他文件添加到集合中。

我会用TPL数据流实现您的需求。看看用它实现生产者-消费者模式的方法。我相信这将满足您的所有要求(包括取消消费者)。

EDIT(对于那些不喜欢阅读文档,但喜欢阅读的人…)

下面是一个如何使用TPL数据流实现需求的示例。这种实现的美妙之处在于,消费者不绑定到单个线程,只在需要处理数据时使用池线程。

static void Main(string[] args)
{
BufferBlock<string> source = new BufferBlock<string>();
var cancellation = new CancellationTokenSource();
LinkConsumer(source, "A", cancellation.Token);
LinkConsumer(source, "B", cancellation.Token);
LinkConsumer(source, "C", cancellation.Token);
// Link an action that will process source values that are not processed by other 
source.LinkTo(new ActionBlock<string>((s) => Console.WriteLine("Default action")));
while (cancellation.IsCancellationRequested == false)
{
ConsoleKey key = Console.ReadKey(true).Key;
switch (key)
{
case ConsoleKey.Escape:
cancellation.Cancel();
break;
default:
Console.WriteLine("Posted value {0} on thread {1}.", key, Thread.CurrentThread.ManagedThreadId);
source.Post(key.ToString());
break;
}
}
source.Complete();
Console.WriteLine("Done.");
Console.ReadLine();
}
private static void LinkConsumer(ISourceBlock<string> source, string prefix, CancellationToken token)
{
// Link a consumer that will buffer and process all input of the specified prefix
var consumer = new ActionBlock<string>(new Action<string>(Process), new ExecutionDataflowBlockOptions() { MaxDegreeOfParallelism = 1, SingleProducerConstrained = true, CancellationToken = token, TaskScheduler = TaskScheduler.Default });
var linkDisposable = source.LinkTo(consumer, (p) => p == prefix);
// Dispose the link (remove the link) when cancellation is requested.
token.Register(linkDisposable.Dispose);
}
private static void Process(string arg)
{
Console.WriteLine("Processed value {0} in thread {1}", arg, Thread.CurrentThread.ManagedThreadId);
// Simulate work
Thread.Sleep(500);
}

相关内容

  • 没有找到相关文章

最新更新