2017-01-30 9 views
1

Я реализую lock-free queue, описанный в «C++ Concurrency in Action» Энтони Уильямса. Я тестирую его как новый контейнер libcds. Тесты на поп и толчок работают нормально. Но несколько продюсеров, множественное тестирование потребителя иногда случается. VLD (или Intel Inspector XE или ASan) показывает утечку памяти. Я исправляю это, добавляя деструктор узла, но проблема с присутствием всех элементов остается. Как я могу решить эту проблему? Благодарю.Удаление узла узла без блокировки Энтони Уильямса

Williams безблокировочного очередь:

#include <memory> 
template <class T> 
class williams_queue 
{ 
public: 
    williams_queue() 
    { 
    counted_node_ptr counted_node; 
    counted_node.ptr = new node; 
    counted_node.external_count = 1; 

    head_.store(counted_node); 
    tail_.store(head_); 
    } 

    williams_queue(const lock_free_queue_mpmc& other) = delete; 
    williams_queue& operator=(const lock_free_queue_mpmc& other) = delete; 

    ~williams_queue() 
    { 
    counted_node_ptr old_head = head_.load(); 
    while (node* const old_node = old_head.ptr) 
    { 
     head_.store(old_node->next); 
     delete old_node; 
     old_head = head_.load(); 
    } 
    } 

    void push(const T& new_value) 
    { 
    std::unique_ptr<T> new_data(new T(new_value)); 

    counted_node_ptr new_next; 
    new_next.ptr = new node; 
    new_next.external_count = 1; 
    counted_node_ptr old_tail = tail_.load(); 

    while (true) 
    { 
     increase_external_count(tail_, old_tail); 
     T* old_data = nullptr; 
     if (old_tail.ptr->data.compare_exchange_strong(old_data, new_data.get())) 
     { 
     counted_node_ptr old_next = {0}; 
     if (!old_tail.ptr->next.compare_exchange_strong(old_next, new_next)) 
     { 
      delete new_next.ptr; 
      new_next = old_next; 
     } 
     set_new_tail(old_tail, new_next); 
     new_data.release(); 
     break; 
     } 
     else 
     { 
     counted_node_ptr old_next = {0}; 
     if(old_tail.ptr->next.compare_exchange_strong(old_next, new_next)) 
     { 
      old_next = new_next; 
      new_next.ptr = new node; 
     } 
     set_new_tail(old_tail, old_next); 
     } 
    } 
    } 

    bool pop(Func f) 
    { 
    counted_node_ptr old_head = head_.load(std::memory_order_relaxed); 
    while (true) 
    { 
     increase_external_count(head_, old_head); 
     node* const ptr = old_head.ptr; 
     if(ptr == tail_.load().ptr) 
     { 
     release_ref(p); 
     return false; 
     } 
     counted_node_ptr next = ptr->next.load(); 
     if (head_.compare_exchange_strong(old_head,next)) 
     { 
     T* const res = ptr->data.exchange(nullptr); 
     free_external_counter(old_head); 
     f(res.get()); 
     return true; 
     } 
     ptr->release_ref(); 
    } 
    } 

private: 
    struct node; 

    struct counted_node_ptr 
    { 
    int external_count; 
    node* ptr; 
    }; 

    struct node_counter 
    { 
    unsigned internal_count : 30; 
    unsigned external_counters : 2; 
    }; 

    struct node 
    { 
    std::atomic<T*> data; 
    std::atomic<node_counter> count; 
    std::atomic<counted_node_ptr> next; 

    node() 
    { 
     node_counter new_count; 
     new_count.internal_count = 0; 
     new_count.external_counters = 2; 
     count.store(new_count); 

     counted_node_ptr new_next; 
     new_next.ptr   = nullptr; 
     new_next.external_count = 0; 
     next.store(new_next); 
    } 

    }; 

    static void release_ref(node * p) 
    { 
     node_counter old_counter = p->count.load(std::memory_order_relaxed); 
     node_counter new_counter; 

     do 
     { 
     new_counter=old_counter; 
     --new_counter.internal_count; 
     } 
     while(!p->count.compare_exchange_strong(old_counter, new_counter, 
             std::memory_order_acquire, 
             std::memory_order_relaxed)); 

     if(!new_counter.internal_count && !new_counter.external_counters) 
     { 
     delete p; 
     } 
    } 

private: 
    void set_new_tail(counted_node_ptr& old_tail, 
        const counted_node_ptr& new_tail) 
    { 
    node* const current_tail_ptr = old_tail.ptr; 

    while (!tail_.compare_exchange_weak(old_tail, new_tail) && 
     old_tail.ptr == current_tail_ptr); 

    if(old_tail.ptr == current_tail_ptr) 
    { 
     free_external_counter(old_tail); 
    } 
    else 
    { 
     release_ref(current_tail_ptr); 
    } 
    } 

    static void increase_external_count(std::atomic<counted_node_ptr>&  counter, 
             counted_node_ptr& old_counter) 
    { 
    counted_node_ptr new_counter; 

    do 
    { 
     new_counter = old_counter; 
     ++new_counter.external_count; 
    } 
    while(!counter.compare_exchange_strong(old_counter, new_counter, 
             std::memory_order_acquire, 
             std::memory_order_relaxed)); 

    old_counter.external_count = new_counter.external_count; 
    } 

    static void free_external_counter(counted_node_ptr& old_node_ptr) 
    { 
    node* const ptr = old_node_ptr.ptr; 
    const int count_increase = old_node_ptr.external_count - 2; 
    node_counter old_counter= ptr->count.load(std::memory_order_relaxed); 
    node_counter new_counter; 

    do 
    { 
     new_counter = old_counter; 
     --new_counter.external_counters; 
     new_counter.internal_count += count_increase; 
    } 
    while(!ptr->count.compare_exchange_strong(old_counter, new_counter, 
              std::memory_order_acquire, 
              std::memory_order_relaxed)); 

    if(!new_counter.internal_count && !new_counter.external_counters) 
    { 
     delete ptr; 
    } 
    } 

private: 

    std::atomic<counted_node_ptr> head_; 
    std::atomic<counted_node_ptr> tail_; 

}; 

Результат теста:

Визуальный детектор утечек чтения параметров из: D: \ развития \ COMMON_UTILS \ Визуальная Leak Detector \ VLD .ini Визуальный детектор утечки версии 2.5 установлен. libcds версия 2.1.0 Тест начала 2017-Jan-31 1:19:03 Использование файла конфигурации теста: тест-debug.conf топологии системы: Логическое Количество процессоров: 4

Queue_ReaderWriter::WilliamsQueue_default    
    reader count=3 writer count=3 item count=99999... 
     Item count: 0 
     Item count: 0 
     Item count: 0 
     Post pops: 0 
     Reader 0 popped count=35822 
     Reader 1 popped count=32755 
     Reader 2 popped count=31420 
     Readers: duration=0.893811, success pop=99997, failed pops=261140 
     Writers: duration=0.841302, failed push=0 

d: \ development \ libcds \ tests \ unit \ queue \ queue_reader_writer.cpp (253): CPPUNIT_CH ECK (nTotalPops + nPostTestPops == nQueueSize: popped = 99997 должно быть 99999); Тестовая консистенция всплывающей последовательности ...

ВНИМАНИЕ: Визуальный детектор утечки обнаружил утечки памяти! ---------- Блок 116955 при 0x00DB33D0: 8 байтов ---------- Утечка хэша: 0xD835B211, количество: 1, всего 8 байт Вызов стека (TID 2836): ucrtbased.dll! malloc() f: \ dd \ vctools \ crt \ vcstartup \ src \ heap \ new_scalar.cpp (19): unit-queue_d.exe! o perator new() + 0x9 байт d: \ development \ libcds \ cds \ container \ williams_queue.h (297): unit-queue_d.exe ! cds :: container :: WilliamsQueue :: push() d: \ development \ libcds \ tests \ unit \ queue \ queue_reader_writer.cpp (85): блок-Цюй eue_d.exe очереди :: Queue_ReaderWriter :: WriterThread> :: гр EST() + 0xF байт

! 0

Затем я исправляю утечки памяти, добавляя деструктор узла с удалением данных. Но тестовые сбои все еще остаются.

Код испытания работы

namespace { 
    static size_t s_nReaderThreadCount = 4; 
    static size_t s_nWriterThreadCount = 4; 
    static size_t s_nQueueSize = 100000; // by default 4000000; 

    struct Value { 
     size_t  nNo; 
     size_t  nWriterNo; 
    }; 
} 

class Queue_ReaderWriter: public CppUnitMini::TestCase 
{ 
    template <class Queue> 
    class WriterThread: public CppUnitMini::TestThread 
    { 
    public: 
     Queue&    m_Queue; 
     double    m_fTime; 
     size_t    m_nPushFailed; 

     virtual void test() 
     { 
      size_t nPushCount = getTest().m_nThreadPushCount; 
      Value v; 
      v.nWriterNo = m_nThreadNo; 
      v.nNo = 0; 
      m_nPushFailed = 0; 

      m_fTime = m_Timer.duration(); 

      while (v.nNo < nPushCount) { 
       if (m_Queue.push(v)) { 
        ++v.nNo; 
       } 
       else 
        ++m_nPushFailed; 
      } 

      m_fTime = m_Timer.duration() - m_fTime; 
      getTest().m_nWriterDone.fetch_add(1); 
     } 
    }; 

    template <class Queue> 
    class ReaderThread: public CppUnitMini::TestThread 
    { 
    public: 
     Queue&    m_Queue; 
     double    m_fTime; 
     size_t    m_nPopEmpty; 
     size_t    m_nPopped; 
     size_t    m_nBadWriter; 

     typedef std::vector<size_t> TPoppedData; 
     std::vector<TPoppedData>  m_WriterData; 

     virtual void test() 
     { 
      m_nPopEmpty = 0; 
      m_nPopped = 0; 
      m_nBadWriter = 0; 
      const size_t nTotalWriters = s_nWriterThreadCount; 
      Value v; 

      m_fTime = m_Timer.duration(); 

      while (true) { 
       if (m_Queue.pop(v)) { 
        ++m_nPopped; 
        if (/*v.nWriterNo >= 0 &&*/ v.nWriterNo < nTotalWriters) 
         m_WriterData[ v.nWriterNo ].push_back(v.nNo); 
        else 
         ++m_nBadWriter; 
       } 
       else 
        ++m_nPopEmpty; 

       if (m_Queue.empty()) { 
        if (getTest().m_nWriterDone.load() >= nTotalWriters) { 
         CPPUNIT_MSG(" Item count: " << m_Queue.size()); 
         if (m_Queue.empty()) 
           break; 
        } 
       } 
      } 

      m_fTime = m_Timer.duration() - m_fTime; 
     } 
    }; 

protected: 
    size_t     m_nThreadPushCount; 
    atomics::atomic<size_t>  m_nWriterDone; 

protected: 
    template <class Queue> 
    void analyze(CppUnitMini::ThreadPool& pool, Queue& testQueue, size_t /*nLeftOffset*/ = 0, size_t nRightOffset = 0 ) 
    { 
     typedef ReaderThread<Queue> Reader; 
     typedef WriterThread<Queue> Writer; 

     size_t nPostTestPops = 0; 
     { 
      Value v; 
      while (testQueue.pop(v)) 
       ++nPostTestPops; 
     } 
     CPPUNIT_MSG(" Post pops: " << nPostTestPops); 

     double fTimeWriter = 0; 
     double fTimeReader = 0; 
     size_t nTotalPops = 0; 
     size_t nPopFalse = 0; 
     size_t nPoppedItems = 0; 
     size_t nPushFailed = 0; 

     std::vector< Reader * > arrReaders; 

     for (CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it) { 
      Reader * pReader = dynamic_cast<Reader *>(*it); 
      if (pReader) { 
       fTimeReader += pReader->m_fTime; 
       nTotalPops += pReader->m_nPopped; 
       nPopFalse += pReader->m_nPopEmpty; 
       arrReaders.push_back(pReader); 
       CPPUNIT_CHECK_EX(pReader->m_nBadWriter == 0, "reader " << pReader->m_nThreadNo << " bad writer event count=" << pReader->m_nBadWriter); 

       size_t nPopped = 0; 
       for (size_t n = 0; n < s_nWriterThreadCount; ++n) 
        nPopped += pReader->m_WriterData[n].size(); 

       CPPUNIT_MSG(" Reader " << pReader->m_nThreadNo - s_nWriterThreadCount << " popped count=" << nPopped); 
       nPoppedItems += nPopped; 
      } 
      else { 
       Writer * pWriter = dynamic_cast<Writer *>(*it); 
       CPPUNIT_ASSERT(pWriter != nullptr); 
       fTimeWriter += pWriter->m_fTime; 
       nPushFailed += pWriter->m_nPushFailed; 
       if (!boost::is_base_of<cds::bounded_container, Queue>::value) { 
        CPPUNIT_CHECK_EX(pWriter->m_nPushFailed == 0, 
         "writer " << pWriter->m_nThreadNo << " push failed count=" << pWriter->m_nPushFailed); 
       } 
      } 
     } 
     CPPUNIT_CHECK_EX(nTotalPops == nPoppedItems, "nTotalPops=" << nTotalPops << ", nPoppedItems=" << nPoppedItems); 

     CPPUNIT_MSG(" Readers: duration=" << fTimeReader/s_nReaderThreadCount << ", success pop=" << nTotalPops << ", failed pops=" << nPopFalse); 
     CPPUNIT_MSG(" Writers: duration=" << fTimeWriter/s_nWriterThreadCount << ", failed push=" << nPushFailed); 

     size_t nQueueSize = m_nThreadPushCount * s_nWriterThreadCount; 
     CPPUNIT_CHECK_EX(nTotalPops + nPostTestPops == nQueueSize, "popped=" << nTotalPops + nPostTestPops << " must be " << nQueueSize); 
     CPPUNIT_CHECK(testQueue.empty()); 
    } 

    template <class Queue> 
    void test() 
    { 
     m_nThreadPushCount = s_nQueueSize/s_nWriterThreadCount; 
     CPPUNIT_MSG(" reader count=" << s_nReaderThreadCount << " writer count=" << s_nWriterThreadCount 
      << " item count=" << m_nThreadPushCount * s_nWriterThreadCount << "..."); 

     Queue testQueue; 
     CppUnitMini::ThreadPool pool(*this); 

     m_nWriterDone.store(0); 

     // Writers must be first 
     pool.add(new WriterThread<Queue>(pool, testQueue), s_nWriterThreadCount); 
     pool.add(new ReaderThread<Queue>(pool, testQueue), s_nReaderThreadCount); 

     pool.run(); 

     analyze(pool, testQueue); 
     CPPUNIT_MSG(testQueue.statistics()); 
    } 

ответ

0

трассировки стека из VLD говорит вам, где память была выделена, но не освобождены: WilliamsQueue::push, строка 297 в вашем заголовке.

Если эта выделенная память периодически просачивается, вероятно, находится в линии old_next = new_next. Вы копируете существующий counted_node_ptr поверх пустого, выделяете некоторую новую память, тогда нет очевидного места для удаления ранее выделенной памяти.

+0

Спасибо за ваш ответ!Как уже говорилось выше утечки памяти может быть устранена путем добавления деструктор узла: 'структура NODE_TYPE { \t \t \t ~ NODE_TYPE() { \t \t \t \t удалить m_value.load(); \t \t \t} }; 'Но в этом случае утечки могут мне помочь, они показывают, что некоторые узлы были удалены из-за многопоточного взаимодействия. Но я могу проследить, почему это может быть. – FatherOctober

+0

'old_next = new_next', чтобы установить новый хвост, поскольку предыдущий имеет значение m_value! = NULL, а m_next - NULL. Эта ситуация может быть связана с двумя потоками для push(), но один из них выполняет оператор if(). Таким образом, другой поток может выполнять оператор 'old_tail.ptr-> m_next.compare_exchange_strong (old_next, new_next)' in else() быстрее, чем один. – FatherOctober