在C#中实现线程进行快速、批量和连续读取的最佳实践



在.NET 4.0中应该如何处理从C#中的设备读取大容量数据?具体来说,我需要从一个USB HID设备上快速读取,该设备在26个数据包上发出报告,其中必须保持顺序。

我在BackgroundWorker线程中尝试过这样做。它一次从设备中读取一个数据包,并在读取更多数据包之前对其进行处理。这提供了相当好的响应时间,但它很容易在各处丢失数据包,并且单个数据包读取的开销加起来。

while (!( sender as BackgroundWorker ).CancellationPending) {
       //read a single packet
       //check for header or footer
       //process packet data
    }
}

C#中读取这样的设备的最佳实践是什么?


背景:

我的USB HID设备持续报告大量数据。数据被分成26个数据包,我必须保留顺序。不幸的是,该设备只标记每个报告中的第一个数据包和最后一个数据包,因此我需要能够捕获其间的其他数据包。

对于.Net 4,您可以使用BlockingCollection来提供线程安全队列,生产者和消费者都可以使用该队列。BlockingCollection.GetConsumingEnumerable()方法提供了一个枚举器,当队列已使用CompleteAdding()标记为已完成且为空时,该枚举器会自动终止。

下面是一些示例代码。在本例中,有效负载是一个int数组,但您当然可以使用所需的任何数据类型。

请注意,对于您的特定示例,您可以使用GetConsumingEnumerable()的重载,它接受类型为CancellationToken的参数。

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace Demo
{
    public static class Program
    {
        private static void Main()
        {
            var queue = new BlockingCollection<int[]>();
            Task.Factory.StartNew(() => produce(queue));
            consume(queue);
            Console.WriteLine("Finished.");
        }
        private static void consume(BlockingCollection<int[]> queue)
        {
            foreach (var item in queue.GetConsumingEnumerable())
            {
                Console.WriteLine("Consuming " + item[0]);
                Thread.Sleep(25);
            }
        }
        private static void produce(BlockingCollection<int[]> queue)
        {
            for (int i = 0; i < 1000; ++i)
            {
                Console.WriteLine("Producing " + i);
                var payload = new int[100];
                payload[0] = i;
                queue.Add(payload);
                Thread.Sleep(20);
            }
            queue.CompleteAdding();
        }
    }
}

对于.Net 4.5及更高版本,您可以使用Microsoft的任务并行库中的高级类,该库具有丰富的功能(乍一看可能有些令人生畏)。

以下是使用TPL数据流的相同示例:

using System;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
namespace Demo
{
    public static class Program
    {
        private static void Main()
        {
            var queue = new BufferBlock<int[]>();
            Task.Factory.StartNew(() => produce(queue));
            consume(queue).Wait();
            Console.WriteLine("Finished.");
        }
        private static async Task consume(BufferBlock<int[]> queue)
        {
            while (await queue.OutputAvailableAsync())
            {
                var payload = await queue.ReceiveAsync();
                Console.WriteLine("Consuming " + payload[0]);
                await Task.Delay(25);
            }
        }
        private static void produce(BufferBlock<int[]> queue)
        {
            for (int i = 0; i < 1000; ++i)
            {
                Console.WriteLine("Producing " + i);
                var payload = new int[100];
                payload[0] = i;
                queue.Post(payload);
                Thread.Sleep(20);
            }
            queue.Complete();
        }
    }
}

如果丢失数据包是一个问题,请不要在同一个线程上进行处理和读取。从.NET 4.0开始,他们添加了System.Collections.Concurrent命名空间,这使得这很容易做到。您只需要一个BlockingCollection,它可以作为传入数据包的队列。

BlockingCollection<Packet> _queuedPackets = new BlockingCollection<Packet>(new ConcurrentQueue<Packet>());
void readingBackgroundWorker_DoWork(object sender, DoWorkEventArgs e)
{
    while (!( sender as BackgroundWorker ).CancellationPending) 
    {
       Packet packet = GetPacket();
       _queuedPackets.Add(packet);
    }        
    _queuedPackets.CompleteAdding();
}
void processingBackgroundWorker_DoWork(object sender, DoWorkEventArgs e)
{
    List<Packet> report = new List<Packet>();
    foreach(var packet in _queuedPackets.GetConsumingEnumerable())
    {
        report.Add(packet);
        if(packet.IsLastPacket)
        {
            ProcessReport(report);
            report = new List<Packet>();
        }
    }
}

_queuedPackets为空时,_queuedPackets.GetConsumingEnumerable()将阻止线程不消耗任何资源。一旦数据包到达,它就会解除锁定并执行foreach的下一次迭代。

当您调用_queuedPackets.CompleteAdding();时,处理线程上的foreach将运行,直到集合为空,然后退出foreach循环。如果你不想在取消时"排完队",你可以很容易地将其更改为提前退出。我还将改用任务而不是后台工作程序,因为这使传入参数更容易完成

void ReadingLoop(BlockingCollection<Packet> queue, CancellationToken token)
{
    while (!token.IsCancellationRequested) 
    {
       Packet packet = GetPacket();
       queue.Add(packet);
    }        
    queue.CompleteAdding();
}
void ProcessingLoop(BlockingCollection<Packet> queue, CancellationToken token)
{
    List<Packet> report = new List<Packet>();
    try
    {
        foreach(var packet in queue.GetConsumingEnumerable(token))
        {
            report.Add(packet);
            if(packet.IsLastPacket)
            {
                ProcessReport(report);
                report = new List<Packet>();
            }
        }
    }
    catch(OperationCanceledException)
    {
        //Do nothing, we don't care that it happened.
    }
}
//This would replace your backgroundWorker.RunWorkerAsync() calls;
private void StartUpLoops()
{
    var queue = new BlockingCollection<Packet>(new ConcurrentQueue<Packet>());    
    var cancelRead = new CancellationTokenSource();
    var cancelProcess = new CancellationTokenSource();
    Task.Factory.StartNew(() => ReadingLoop(queue, cancelRead.Token));
    Task.Factory.StartNew(() => ProcessingLoop(queue, cancelProcess.Token));
    //You can stop each loop indpendantly by calling cancelRead.Cancel() or cancelProcess.Cancel()
}

最新更新