2010-08-28 2 views
6

Итак, это продолжение моего последнего вопроса. Так что вопрос был «Какой лучший способ создать программу, которая является потокобезопасной, в том смысле, что ей нужно записать двойные значения в файл. Если функция, которая сохраняет значения через streamwriter вызываются несколькими потоками? Каков наилучший способ сделать это? "Thread safe StreamWriter C# как это сделать? 2

И я изменил код, найденный в MSDN, как насчет следующего? Это правильно записывает все в файл.

namespace SafeThread 
{ 
    class Program 
    { 
     static void Main() 
     { 
      Threading threader = new Threading(); 

      AutoResetEvent autoEvent = new AutoResetEvent(false); 

      Thread regularThread = 
       new Thread(new ThreadStart(threader.ThreadMethod)); 
      regularThread.Start(); 

      ThreadPool.QueueUserWorkItem(new WaitCallback(threader.WorkMethod), 
       autoEvent); 

      // Wait for foreground thread to end. 
      regularThread.Join(); 

      // Wait for background thread to end. 
      autoEvent.WaitOne(); 
     } 
    } 


    class Threading 
    { 
     List<double> Values = new List<double>(); 
     static readonly Object locker = new Object(); 
     StreamWriter writer = new StreamWriter("file"); 
     static int bulkCount = 0; 
     static int bulkSize = 100000; 

     public void ThreadMethod() 
     { 
      lock (locker) 
      { 
       while (bulkCount < bulkSize) 
        Values.Add(bulkCount++); 
      } 
      bulkCount = 0; 
     } 

     public void WorkMethod(object stateInfo) 
     { 
      lock (locker) 
      { 
       foreach (double V in Values) 
       { 
        writer.WriteLine(V); 
        writer.Flush(); 
       } 
      } 
      // Signal that this thread is finished. 
      ((AutoResetEvent)stateInfo).Set(); 
     } 
    } 
} 
+4

Некоторые комментарии с downvotes были бы приятными. –

ответ

9

Thread и QueueUserWorkItem являются наименее доступными API для нарезки. Я бы не использовал их, если бы у меня, наконец, не было другого выбора. Попробуйте класс Task для гораздо более высокого уровня абстракции. Для получения дополнительной информации, see my recent blog post on the subject.

Вы также можете использовать BlockingCollection<double> как надлежащего производитель/потребитель очереди вместо того, чтобы пытаться построить один вручную с низких доступных интерфейсов API для синхронизации.

Повторное включение этих колес на удивление сложно. Я настоятельно рекомендую использовать классы, предназначенные для такого типа потребностей (например, Task и BlockingCollection). Они встроены в платформу .NET 4.0 и are available as an add-on for .NET 3.5.

2

код у вас есть там тонко сломана - в частности, если в очередь рабочий элемент проходит первый, то он будет промывать (пустой) список значений сразу, перед завершением, после чего ваш работник идет и заполняет список (который в конечном итоге игнорируется). Событие автоматического сброса также ничего не делает, поскольку ничто никогда не запрашивает или не ждет его состояния.

Кроме того, поскольку каждая нить использует другой замок, замки не имеют никакого значения! При доступе к потоковому обозревателю необходимо обеспечить единую блокировку. Вам не нужна блокировка между кодом промывки и кодом генерации; вам просто нужно убедиться, что флеш запускается после завершения поколений.

Возможно, вы, вероятно, на правильном пути, хотя я бы использовал массив фиксированного размера вместо списка и очищал все записи из массива, когда он заполняется. Это позволяет избежать нехватки памяти, если поток долговечен.

+0

Как происходит промывка, если список пуст? – 2010-08-28 15:51:42

+1

Я объяснил это в своем ответе - работа в очереди «WorkMethod» может запускать _before_ поток ThreadMethod. И он также может работать после. Вы не можете предсказать, что, потому что вы не задали никакого явного порядка здесь. – bdonlan

6
  • Код имеет автора как экземпляр var, но использует статический шкафчик. Если у вас было несколько экземпляров, пишущих в разные файлы, нет причин, по которым они должны были бы использовать один и тот же замок
  • в соответствующей заметке, поскольку у вас уже есть писатель (в качестве частного экземпляра var), вы можете использовать это для блокировки вместо использования отдельного предмета-замка в этом случае - это делает вещи немного проще.

«Правильный ответ» действительно зависит от того, что вы ищете в плане блокировки/блокировки. Например, проще всего пропустить промежуточную структуру данных, просто иметь метод WriteValues, чтобы каждый поток сообщал свои результаты и записывал их в файл.Что-то вроде:

StreamWriter writer = new StreamWriter("file"); 
public void WriteValues(IEnumerable<double> values) 
{ 
    lock (writer) 
    { 
     foreach (var d in values) 
     { 
      writer.WriteLine(d); 
     } 
     writer.Flush(); 
    } 
} 

Конечно, это означает, что рабочие потоки сериализации на этапах их «отчета о результатах» - в зависимости от технических характеристик, которые могут быть просто отлично, хотя (5 минут, чтобы генерировать, 500мс писать, например,).

На другом конце спектра вы должны писать рабочие потоки в структуру данных. Если вы в .NET 4, я бы рекомендовал использовать только ConcurrentQueue, а не делать это самостоятельно.

Кроме того, вы можете делать файлы ввода/вывода в больших партиях, чем те, которые сообщаются рабочими потоками, поэтому вы можете просто писать в фоновом потоке на некоторой частоте. Это конец спектра выглядит как ниже (вы бы удалить Console.WriteLine звонки в реальном коде, это только там, так что вы можете видеть, что это работает в действии)

public class ThreadSafeFileBuffer<T> : IDisposable 
{ 
    private readonly StreamWriter m_writer; 
    private readonly ConcurrentQueue<T> m_buffer = new ConcurrentQueue<T>(); 
    private readonly Timer m_timer; 

    public ThreadSafeFileBuffer(string filePath, int flushPeriodInSeconds = 5) 
    { 
     m_writer = new StreamWriter(filePath); 
     var flushPeriod = TimeSpan.FromSeconds(flushPeriodInSeconds); 
     m_timer = new Timer(FlushBuffer, null, flushPeriod, flushPeriod); 
    } 

    public void AddResult(T result) 
    { 
     m_buffer.Enqueue(result); 
     Console.WriteLine("Buffer is up to {0} elements", m_buffer.Count); 
    } 

    public void Dispose() 
    { 
     Console.WriteLine("Turning off timer"); 
     m_timer.Dispose(); 
     Console.WriteLine("Flushing final buffer output"); 
     FlushBuffer(); // flush anything left over in the buffer 
     Console.WriteLine("Closing file"); 
     m_writer.Dispose(); 
    } 

    /// <summary> 
    /// Since this is only done by one thread at a time (almost always the background flush thread, but one time via Dispose), no need to lock 
    /// </summary> 
    /// <param name="unused"></param> 
    private void FlushBuffer(object unused = null) 
    { 
     T current; 
     while (m_buffer.TryDequeue(out current)) 
     { 
      Console.WriteLine("Buffer is down to {0} elements", m_buffer.Count); 
      m_writer.WriteLine(current); 
     } 
     m_writer.Flush(); 
    } 
} 

class Program 
{ 
    static void Main(string[] args) 
    { 
     var tempFile = Path.GetTempFileName(); 
     using (var resultsBuffer = new ThreadSafeFileBuffer<double>(tempFile)) 
     { 
      Parallel.For(0, 100, i => 
      { 
       // simulate some 'real work' by waiting for awhile 
       var sleepTime = new Random().Next(10000); 
       Console.WriteLine("Thread {0} doing work for {1} ms", Thread.CurrentThread.ManagedThreadId, sleepTime); 
       Thread.Sleep(sleepTime); 
       resultsBuffer.AddResult(Math.PI*i); 
      }); 
     } 
     foreach (var resultLine in File.ReadAllLines(tempFile)) 
     { 
      Console.WriteLine("Line from result: {0}", resultLine); 
     } 
    } 
} 
4

Так ты говоришь, хотите кучу потоков для записи данных в один файл с помощью StreamWriter? Легко. Просто заблокируйте объект StreamWriter.

Код здесь будет создавать 5 тем. Каждый поток будет выполнять 5 «действий», и в конце каждого действия он будет писать 5 строк в файл с именем «файл».

using System; 
using System.Collections.Generic; 
using System.IO; 
using System.Threading; 

namespace ConsoleApplication1 { 
    class Program { 
     static void Main() { 
      StreamWriter Writer = new StreamWriter("file"); 

      Action<int> ThreadProcedure = (i) => { 
       // A thread may perform many actions and write out the result after each action 
       // The outer loop here represents the multiple actions this thread will take 
       for (int x = 0; x < 5; x++) { 
        // Here is where the thread would generate the data for this action 
        // Well simulate work time using a call to Sleep 
        Thread.Sleep(1000); 
        // After generating the data the thread needs to lock the Writer before using it. 
        lock (Writer) { 
         // Here we'll write a few lines to the Writer 
         for (int y = 0; y < 5; y++) { 
          Writer.WriteLine("Thread id = {0}; Action id = {1}; Line id = {2}", i, x, y); 
         } 
        } 
       } 
      }; 

      //Now that we have a delegate for the thread code lets make a few instances 

      List<IAsyncResult> AsyncResultList = new List<IAsyncResult>(); 
      for (int w = 0; w < 5; w++) { 
       AsyncResultList.Add(ThreadProcedure.BeginInvoke(w, null, null)); 
      } 

      // Wait for all threads to complete 
      foreach (IAsyncResult r in AsyncResultList) { 
       r.AsyncWaitHandle.WaitOne(); 
      } 

      // Flush/Close the writer so all data goes to disk 
      Writer.Flush(); 
      Writer.Close(); 
     } 
    } 
} 

Результат должен быть файл «файл» с 125 линий в нем со всеми «действиями», выполняемых одновременно и результатом каждого действия синхронно записывается в файл.

+0

Вы не должны блокировать объекты, где вы не контролируете их реализацию, как это - что, если он внутренне блокирует себя в другом потоке? - вы должны создать новый «объект» для использования в качестве блокировки. – bdonlan

+0

Не используйте отдельный объект для блокировки. Блокировка объекта напрямую - единственный способ гарантировать, что все потоки, которые могут видеть объект, могут получить исключительную блокировку объекта. –

+0

Довольно круто, именно то, что я хотел, спасибо! – 2010-08-29 16:03:39