在.NET 4.0中应该如何处理从C#中的设备读取大容量数据?具体来说,我需要从一个USB HID设备上快速读取,该设备在26个数据包上发出报告,其中必须保持顺序。
我在BackgroundWorker线程中尝试过这样做。它一次从设备中读取一个数据包,并在读取更多数据包之前对其进行处理。这提供了相当好的响应时间,但它很容易在各处丢失数据包,并且单个数据包读取的开销加起来。
while (!( sender as BackgroundWorker ).CancellationPending) {
//read a single packet
//check for header or footer
//process packet data
}
}
C#中读取这样的设备的最佳实践是什么?
背景:
我的USB HID设备持续报告大量数据。数据被分成26个数据包,我必须保留顺序。不幸的是,该设备只标记每个报告中的第一个数据包和最后一个数据包,因此我需要能够捕获其间的其他数据包。
对于.Net 4,您可以使用BlockingCollection
来提供线程安全队列,生产者和消费者都可以使用该队列。BlockingCollection.GetConsumingEnumerable()
方法提供了一个枚举器,当队列已使用CompleteAdding()
标记为已完成且为空时,该枚举器会自动终止。
下面是一些示例代码。在本例中,有效负载是一个int数组,但您当然可以使用所需的任何数据类型。
请注意,对于您的特定示例,您可以使用GetConsumingEnumerable()
的重载,它接受类型为CancellationToken
的参数。
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace Demo
{
public static class Program
{
private static void Main()
{
var queue = new BlockingCollection<int[]>();
Task.Factory.StartNew(() => produce(queue));
consume(queue);
Console.WriteLine("Finished.");
}
private static void consume(BlockingCollection<int[]> queue)
{
foreach (var item in queue.GetConsumingEnumerable())
{
Console.WriteLine("Consuming " + item[0]);
Thread.Sleep(25);
}
}
private static void produce(BlockingCollection<int[]> queue)
{
for (int i = 0; i < 1000; ++i)
{
Console.WriteLine("Producing " + i);
var payload = new int[100];
payload[0] = i;
queue.Add(payload);
Thread.Sleep(20);
}
queue.CompleteAdding();
}
}
}
对于.Net 4.5及更高版本,您可以使用Microsoft的任务并行库中的高级类,该库具有丰富的功能(乍一看可能有些令人生畏)。
以下是使用TPL数据流的相同示例:
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace Demo
{
public static class Program
{
private static void Main()
{
var queue = new BufferBlock<int[]>();
Task.Factory.StartNew(() => produce(queue));
consume(queue).Wait();
Console.WriteLine("Finished.");
}
private static async Task consume(BufferBlock<int[]> queue)
{
while (await queue.OutputAvailableAsync())
{
var payload = await queue.ReceiveAsync();
Console.WriteLine("Consuming " + payload[0]);
await Task.Delay(25);
}
}
private static void produce(BufferBlock<int[]> queue)
{
for (int i = 0; i < 1000; ++i)
{
Console.WriteLine("Producing " + i);
var payload = new int[100];
payload[0] = i;
queue.Post(payload);
Thread.Sleep(20);
}
queue.Complete();
}
}
}
如果丢失数据包是一个问题,请不要在同一个线程上进行处理和读取。从.NET 4.0开始,他们添加了System.Collections.Concurrent
命名空间,这使得这很容易做到。您只需要一个BlockingCollection
,它可以作为传入数据包的队列。
BlockingCollection<Packet> _queuedPackets = new BlockingCollection<Packet>(new ConcurrentQueue<Packet>());
void readingBackgroundWorker_DoWork(object sender, DoWorkEventArgs e)
{
while (!( sender as BackgroundWorker ).CancellationPending)
{
Packet packet = GetPacket();
_queuedPackets.Add(packet);
}
_queuedPackets.CompleteAdding();
}
void processingBackgroundWorker_DoWork(object sender, DoWorkEventArgs e)
{
List<Packet> report = new List<Packet>();
foreach(var packet in _queuedPackets.GetConsumingEnumerable())
{
report.Add(packet);
if(packet.IsLastPacket)
{
ProcessReport(report);
report = new List<Packet>();
}
}
}
当_queuedPackets
为空时,_queuedPackets.GetConsumingEnumerable()
将阻止线程不消耗任何资源。一旦数据包到达,它就会解除锁定并执行foreach的下一次迭代。
当您调用_queuedPackets.CompleteAdding();
时,处理线程上的foreach将运行,直到集合为空,然后退出foreach循环。如果你不想在取消时"排完队",你可以很容易地将其更改为提前退出。我还将改用任务而不是后台工作程序,因为这使传入参数更容易完成
void ReadingLoop(BlockingCollection<Packet> queue, CancellationToken token)
{
while (!token.IsCancellationRequested)
{
Packet packet = GetPacket();
queue.Add(packet);
}
queue.CompleteAdding();
}
void ProcessingLoop(BlockingCollection<Packet> queue, CancellationToken token)
{
List<Packet> report = new List<Packet>();
try
{
foreach(var packet in queue.GetConsumingEnumerable(token))
{
report.Add(packet);
if(packet.IsLastPacket)
{
ProcessReport(report);
report = new List<Packet>();
}
}
}
catch(OperationCanceledException)
{
//Do nothing, we don't care that it happened.
}
}
//This would replace your backgroundWorker.RunWorkerAsync() calls;
private void StartUpLoops()
{
var queue = new BlockingCollection<Packet>(new ConcurrentQueue<Packet>());
var cancelRead = new CancellationTokenSource();
var cancelProcess = new CancellationTokenSource();
Task.Factory.StartNew(() => ReadingLoop(queue, cancelRead.Token));
Task.Factory.StartNew(() => ProcessingLoop(queue, cancelProcess.Token));
//You can stop each loop indpendantly by calling cancelRead.Cancel() or cancelProcess.Cancel()
}