2015-02-06 1 views
2

Как следует считывать объемные данные с устройства на C# в .NET 4.0? В частности, мне нужно быстро читать с USB-HID-устройства, которое испускает отчеты по 26 пакетам, где заказ должен быть сохранен.Рекомендации по реализации потока для быстрого, массового и непрерывного чтения в C#?

Я пробовал делать это в потоке BackgroundWorker. Он считывает один пакет из устройства за раз и обрабатывает его, прежде чем читать больше. Это дает разумное время отклика, но оно может потерять пакет здесь и там, и накладные расходы на чтение одного пакета складываются.

while (!(sender as BackgroundWorker).CancellationPending) { 
     //read a single packet 
     //check for header or footer 
     //process packet data 
    } 
} 

Какова наилучшая практика в C# для чтения такого устройства?


фона:

Мой USB HID устройство непрерывно передает большое количество данных. Данные разделены на 26 пакетов, и я должен зарезервировать заказ. К сожалению, устройство отмечает только первые пакеты в каждом отчете, поэтому мне нужно уловить каждый другой пакет между ними.

+0

Какая версия .net? Ответ будет зависеть от этого. –

+0

@MatthewWatson Я нацелился на .NET 4.0, но было бы замечательно, если бы ответ объяснил разницу с другими версиями. – Semaphore

+0

Скорость передачи данных HID очень низкая, не более 8 КБ/сек. Вы не можете писать код, который не может идти в ногу с этим, не требуется «передовой практики». –

ответ

5

Для .Net 4 вы можете использовать BlockingCollection для обеспечения очереди потокобезопасности, которая может использоваться производителем и потребителем. Метод BlockingCollection.GetConsumingEnumerable() предоставляет перечислитель, который автоматически завершается, когда очередь помечена как завершенная с использованием CompleteAdding() и пуста.

Вот пример кода. В этом примере полезная нагрузка представляет собой массив ints, но, конечно, вы будете использовать любой тип данных, который вам нужен.

Обратите внимание, что для вашего конкретного примера вы можете использовать the overload of GetConsumingEnumerable(), который принимает аргумент типа CancellationToken.

using System; 
using System.Collections.Concurrent; 
using System.Threading; 
using System.Threading.Tasks; 

namespace Demo 
{ 
    public static class Program 
    { 
     private static void Main() 
     { 
      var queue = new BlockingCollection<int[]>(); 

      Task.Factory.StartNew(() => produce(queue)); 

      consume(queue); 

      Console.WriteLine("Finished."); 
     } 

     private static void consume(BlockingCollection<int[]> queue) 
     { 
      foreach (var item in queue.GetConsumingEnumerable()) 
      { 
       Console.WriteLine("Consuming " + item[0]); 
       Thread.Sleep(25); 
      } 
     } 

     private static void produce(BlockingCollection<int[]> queue) 
     { 
      for (int i = 0; i < 1000; ++i) 
      { 
       Console.WriteLine("Producing " + i); 
       var payload = new int[100]; 
       payload[0] = i; 
       queue.Add(payload); 
       Thread.Sleep(20); 
      } 

      queue.CompleteAdding(); 
     } 
    } 
} 

Для .Net 4.5 и более поздних версиях можно использовать классы более высокого уровня из Microsoft's Task Parallel Library, который имеет множество функциональных возможностей (и может быть несколько сложной на первый взгляд).

Вот тот же самый пример использования TPL DataFlow:

using System; 
using System.Threading; 
using System.Threading.Tasks; 
using System.Threading.Tasks.Dataflow; 

namespace Demo 
{ 
    public static class Program 
    { 
     private static void Main() 
     { 
      var queue = new BufferBlock<int[]>(); 

      Task.Factory.StartNew(() => produce(queue)); 
      consume(queue).Wait(); 

      Console.WriteLine("Finished."); 
     } 

     private static async Task consume(BufferBlock<int[]> queue) 
     { 
      while (await queue.OutputAvailableAsync()) 
      { 
       var payload = await queue.ReceiveAsync(); 
       Console.WriteLine("Consuming " + payload[0]); 
       await Task.Delay(25); 
      } 
     } 

     private static void produce(BufferBlock<int[]> queue) 
     { 
      for (int i = 0; i < 1000; ++i) 
      { 
       Console.WriteLine("Producing " + i); 
       var payload = new int[100]; 
       payload[0] = i; 
       queue.Post(payload); 
       Thread.Sleep(20); 
      } 

      queue.Complete(); 
     } 
    } 
} 
+1

Я собирался опубликовать очень похожий ответ, единственное, что у меня было, это то, что у меня было «производство» и «потребление» в «CancellationToken», чтобы я мог воссоздать поведение OP '(! (Отправитель как BackgroundWorker) .Отмена Пенсии) ' –

+0

@ScottChamberlain Хорошая точка, я добавлю это. –

+1

Я приложил к этому много работы, я все равно решил опубликовать свой ответ. –

4

Если отсутствуют пакеты является проблемой не делают вашу обработку и ваше чтение на том же потоке. Начиная с .NET 4.0, они добавили пространство имен System.Collections.Concurrent, что делает это очень легко сделать. Все, что вам нужно, это BlockingCollection, который ведет себя как очередь для входящих пакетов.

BlockingCollection<Packet> _queuedPackets = new BlockingCollection<Packet>(new ConcurrentQueue<Packet>()); 

void readingBackgroundWorker_DoWork(object sender, DoWorkEventArgs e) 
{ 
    while (!(sender as BackgroundWorker).CancellationPending) 
    { 
     Packet packet = GetPacket(); 
     _queuedPackets.Add(packet); 
    }   
    _queuedPackets.CompleteAdding(); 
} 

void processingBackgroundWorker_DoWork(object sender, DoWorkEventArgs e) 
{ 
    List<Packet> report = new List<Packet>(); 
    foreach(var packet in _queuedPackets.GetConsumingEnumerable()) 
    { 
     report.Add(packet); 
     if(packet.IsLastPacket) 
     { 
      ProcessReport(report); 
      report = new List<Packet>(); 
     } 
    } 
} 

Что будет происходить в то время как _queuedPackets пуст _queuedPackets.GetConsumingEnumerable() будет блокировать поток не потребляет никаких ресурсов. Как только пакет поступит, он разблокируется и выполняет следующую итерацию foreach.

Когда вы вызываете _queuedPackets.CompleteAdding();, foreach на вашем потоке обработки будет работать до тех пор, пока коллекция не будет пуста, а затем выйдите из цикла foreach. Если вы не хотите, чтобы он «закончил очередь», когда вы отменяете, вы можете легко изменить его, чтобы выйти раньше. Я также собираюсь переключиться на использование задач вместо фоновых работников, потому что упрощает передачу параметров в параметрах.

void ReadingLoop(BlockingCollection<Packet> queue, CancellationToken token) 
{ 
    while (!token.IsCancellationRequested) 
    { 
     Packet packet = GetPacket(); 
     queue.Add(packet); 
    }   
    queue.CompleteAdding(); 
} 

void ProcessingLoop(BlockingCollection<Packet> queue, CancellationToken token) 
{ 
    List<Packet> report = new List<Packet>(); 

    try 
    { 
     foreach(var packet in queue.GetConsumingEnumerable(token)) 
     { 
      report.Add(packet); 
      if(packet.IsLastPacket) 
      { 
       ProcessReport(report); 
       report = new List<Packet>(); 
      } 
     } 
    } 
    catch(OperationCanceledException) 
    { 
     //Do nothing, we don't care that it happened. 
    } 
} 

//This would replace your backgroundWorker.RunWorkerAsync() calls; 
private void StartUpLoops() 
{ 
    var queue = new BlockingCollection<Packet>(new ConcurrentQueue<Packet>());  
    var cancelRead = new CancellationTokenSource(); 
    var cancelProcess = new CancellationTokenSource(); 
    Task.Factory.StartNew(() => ReadingLoop(queue, cancelRead.Token)); 
    Task.Factory.StartNew(() => ProcessingLoop(queue, cancelProcess.Token)); 

    //You can stop each loop indpendantly by calling cancelRead.Cancel() or cancelProcess.Cancel() 
} 
+0

Спасибо, что решили опубликовать это, это очень информативно +1 – Semaphore