跨线程读取和写入并发队列(C#)



我有一个应用程序,简而言之,它创建了"WebPage"类型的对象。

然后将这些对象插入到SQL数据库中。

我想从数据库中检索这些记录,然后将它们加载到一些文件中。

我创建了一个While循环来读取查询结果,对于返回的每一行,都会创建一个网页对象并将其添加到静态ConcurrentQueue中。

这就是我的问题所在:

我想要一个单独的线程,当ConcurrentQueue上出现新的东西时,它会做出响应并将对象写入我的文件。我已经有了以单线程和串行方式工作的代码,但我想加快它的速度。

我目前在读卡器中有一段来自SQL数据库的代码,当ConcurrentQueue到达一定数量的对象时,它会发送一个自动重置事件(见下文(

if(flow.CheckEngineCapacity >= 2000 || (Convert.ToInt32(totalRows) - numberOfRecords) < 2000)
{
waitHandle.Set();
Thread fileProcessor = new Thread(delegate () { flow.ProcessExportEngineFlow(waitHandle); });
fileProcessor.Start();
}

最终发生的是某种上下文切换,主线程似乎一直在睡眠,直到它完成——我确实尝试过使用wait和async,但怀疑这不是我需要的。

我该如何按照以下模式进行操作

  • 将新对象添加到ConcurrentQueue
  • 当ConcurrentQueue上达到一定数量的对象时,开始取消对象的队列并将它们加载到文件中,同时仍将对象添加到并发队列中

注意,如果concurrentqueue命中了一定数量的对象,它应该阻塞,直到执行Dequeue的线程可以释放一些空间。

我这样做的原因是为了使解决方案尽可能具有性能——瓶颈应该是写入文件和从数据库读取。

下面是我一直试图组合的类的例子:

public class EngineFlow
{
private static ConcurrentQueue<WebPages> _concurrentWebPageList = new ConcurrentQueue<WebPages>();
public bool IncreaseEngineFlow(WebPages page)
{
bool sucessfullyadded = false;
if (_concurrentWebPageList.Count <= 2000)
{
_concurrentWebPageList.Enqueue(page);
sucessfullyadded = true;
}
else
{
return sucessfullyadded;
}
return sucessfullyadded;
}
public int CheckEngineCapacity { get { return _concurrentWebPageList.Count; } }
private WebPages DecreaseEngineFlow()
{
WebPages page;
_concurrentWebPageList.TryDequeue(out page);
return page;
}
public void ProcessExportEngineFlow(AutoResetEvent waitHandle)
{
if (waitHandle.WaitOne() == false)
{
Thread.Sleep(100);
}
else
{
while (!_concurrentWebPageList.IsEmpty)
{
Console.WriteLine(DecreaseEngineFlow().URL);
Console.WriteLine(CheckEngineCapacity);
waitHandle.Set();
}
}
}

最初这是一个生产者和消费者,但我觉得我可能想得太多了。

谢谢@Henk Holterman

新类使用了BlockingCollection,它解决了所有问题:

Task.Run(() =>
{
flow.ProcessExportEngineFlow();
});
Task.Run(() =>
{
while (reader.Read())
{
flow.IncreaseEngineFlow(webpage);
}

类别定义:

private BlockingCollection<WebPages> _concurrentWebPageList = new BlockingCollection<WebPages>(new ConcurrentQueue<WebPages>(), 1000);
//private static ConcurrentQueue<WebPages> _concurrentWebPageList = new ConcurrentQueue<WebPages>();
public void IncreaseEngineFlow(WebPages page)
{
_concurrentWebPageList.Add(page);
}
public WebPages DecreaseEngineFlow()
{
return _concurrentWebPageList.Take();
}
public void ProcessExportEngineFlow()
{
while(!_concurrentWebPageList.IsCompleted)
{
WebPages page = null;
try
{
page = _concurrentWebPageList.Take();
}
catch (InvalidOperationException) { }
if(page != null)
{
Console.WriteLine(page.URL);
}
}
}
public bool GetEngineState()
{
return _concurrentWebPageList.IsCompleted;
}
public void SetEngineCompleted()
{
_concurrentWebPageList.CompleteAdding();
}

最新更新