Я реализую lock-free queue, описанный в «C++ Concurrency in Action» Энтони Уильямса. Я тестирую его как новый контейнер libcds. Тесты на поп и толчок работают нормально. Но несколько продюсеров, множественное тестирование потребителя иногда случается. VLD (или Intel Inspector XE или ASan) показывает утечку памяти. Я исправляю это, добавляя деструктор узла, но проблема с присутствием всех элементов остается. Как я могу решить эту проблему? Благодарю.Удаление узла узла без блокировки Энтони Уильямса
Williams безблокировочного очередь:
#include <memory>
template <class T>
class williams_queue
{
public:
williams_queue()
{
counted_node_ptr counted_node;
counted_node.ptr = new node;
counted_node.external_count = 1;
head_.store(counted_node);
tail_.store(head_);
}
williams_queue(const lock_free_queue_mpmc& other) = delete;
williams_queue& operator=(const lock_free_queue_mpmc& other) = delete;
~williams_queue()
{
counted_node_ptr old_head = head_.load();
while (node* const old_node = old_head.ptr)
{
head_.store(old_node->next);
delete old_node;
old_head = head_.load();
}
}
void push(const T& new_value)
{
std::unique_ptr<T> new_data(new T(new_value));
counted_node_ptr new_next;
new_next.ptr = new node;
new_next.external_count = 1;
counted_node_ptr old_tail = tail_.load();
while (true)
{
increase_external_count(tail_, old_tail);
T* old_data = nullptr;
if (old_tail.ptr->data.compare_exchange_strong(old_data, new_data.get()))
{
counted_node_ptr old_next = {0};
if (!old_tail.ptr->next.compare_exchange_strong(old_next, new_next))
{
delete new_next.ptr;
new_next = old_next;
}
set_new_tail(old_tail, new_next);
new_data.release();
break;
}
else
{
counted_node_ptr old_next = {0};
if(old_tail.ptr->next.compare_exchange_strong(old_next, new_next))
{
old_next = new_next;
new_next.ptr = new node;
}
set_new_tail(old_tail, old_next);
}
}
}
bool pop(Func f)
{
counted_node_ptr old_head = head_.load(std::memory_order_relaxed);
while (true)
{
increase_external_count(head_, old_head);
node* const ptr = old_head.ptr;
if(ptr == tail_.load().ptr)
{
release_ref(p);
return false;
}
counted_node_ptr next = ptr->next.load();
if (head_.compare_exchange_strong(old_head,next))
{
T* const res = ptr->data.exchange(nullptr);
free_external_counter(old_head);
f(res.get());
return true;
}
ptr->release_ref();
}
}
private:
struct node;
struct counted_node_ptr
{
int external_count;
node* ptr;
};
struct node_counter
{
unsigned internal_count : 30;
unsigned external_counters : 2;
};
struct node
{
std::atomic<T*> data;
std::atomic<node_counter> count;
std::atomic<counted_node_ptr> next;
node()
{
node_counter new_count;
new_count.internal_count = 0;
new_count.external_counters = 2;
count.store(new_count);
counted_node_ptr new_next;
new_next.ptr = nullptr;
new_next.external_count = 0;
next.store(new_next);
}
};
static void release_ref(node * p)
{
node_counter old_counter = p->count.load(std::memory_order_relaxed);
node_counter new_counter;
do
{
new_counter=old_counter;
--new_counter.internal_count;
}
while(!p->count.compare_exchange_strong(old_counter, new_counter,
std::memory_order_acquire,
std::memory_order_relaxed));
if(!new_counter.internal_count && !new_counter.external_counters)
{
delete p;
}
}
private:
void set_new_tail(counted_node_ptr& old_tail,
const counted_node_ptr& new_tail)
{
node* const current_tail_ptr = old_tail.ptr;
while (!tail_.compare_exchange_weak(old_tail, new_tail) &&
old_tail.ptr == current_tail_ptr);
if(old_tail.ptr == current_tail_ptr)
{
free_external_counter(old_tail);
}
else
{
release_ref(current_tail_ptr);
}
}
static void increase_external_count(std::atomic<counted_node_ptr>& counter,
counted_node_ptr& old_counter)
{
counted_node_ptr new_counter;
do
{
new_counter = old_counter;
++new_counter.external_count;
}
while(!counter.compare_exchange_strong(old_counter, new_counter,
std::memory_order_acquire,
std::memory_order_relaxed));
old_counter.external_count = new_counter.external_count;
}
static void free_external_counter(counted_node_ptr& old_node_ptr)
{
node* const ptr = old_node_ptr.ptr;
const int count_increase = old_node_ptr.external_count - 2;
node_counter old_counter= ptr->count.load(std::memory_order_relaxed);
node_counter new_counter;
do
{
new_counter = old_counter;
--new_counter.external_counters;
new_counter.internal_count += count_increase;
}
while(!ptr->count.compare_exchange_strong(old_counter, new_counter,
std::memory_order_acquire,
std::memory_order_relaxed));
if(!new_counter.internal_count && !new_counter.external_counters)
{
delete ptr;
}
}
private:
std::atomic<counted_node_ptr> head_;
std::atomic<counted_node_ptr> tail_;
};
Результат теста:
! 0Визуальный детектор утечек чтения параметров из: D: \ развития \ COMMON_UTILS \ Визуальная Leak Detector \ VLD .ini Визуальный детектор утечки версии 2.5 установлен. libcds версия 2.1.0 Тест начала 2017-Jan-31 1:19:03 Использование файла конфигурации теста: тест-debug.conf топологии системы: Логическое Количество процессоров: 4
Queue_ReaderWriter::WilliamsQueue_default reader count=3 writer count=3 item count=99999... Item count: 0 Item count: 0 Item count: 0 Post pops: 0 Reader 0 popped count=35822 Reader 1 popped count=32755 Reader 2 popped count=31420 Readers: duration=0.893811, success pop=99997, failed pops=261140 Writers: duration=0.841302, failed push=0
d: \ development \ libcds \ tests \ unit \ queue \ queue_reader_writer.cpp (253): CPPUNIT_CH ECK (nTotalPops + nPostTestPops == nQueueSize: popped = 99997 должно быть 99999); Тестовая консистенция всплывающей последовательности ...
ВНИМАНИЕ: Визуальный детектор утечки обнаружил утечки памяти! ---------- Блок 116955 при 0x00DB33D0: 8 байтов ---------- Утечка хэша: 0xD835B211, количество: 1, всего 8 байт Вызов стека (TID 2836): ucrtbased.dll! malloc() f: \ dd \ vctools \ crt \ vcstartup \ src \ heap \ new_scalar.cpp (19): unit-queue_d.exe! o perator new() + 0x9 байт d: \ development \ libcds \ cds \ container \ williams_queue.h (297): unit-queue_d.exe ! cds :: container :: WilliamsQueue :: push() d: \ development \ libcds \ tests \ unit \ queue \ queue_reader_writer.cpp (85): блок-Цюй eue_d.exe очереди :: Queue_ReaderWriter :: WriterThread> :: гр EST() + 0xF байт
Затем я исправляю утечки памяти, добавляя деструктор узла с удалением данных. Но тестовые сбои все еще остаются.
Код испытания работы
namespace {
static size_t s_nReaderThreadCount = 4;
static size_t s_nWriterThreadCount = 4;
static size_t s_nQueueSize = 100000; // by default 4000000;
struct Value {
size_t nNo;
size_t nWriterNo;
};
}
class Queue_ReaderWriter: public CppUnitMini::TestCase
{
template <class Queue>
class WriterThread: public CppUnitMini::TestThread
{
public:
Queue& m_Queue;
double m_fTime;
size_t m_nPushFailed;
virtual void test()
{
size_t nPushCount = getTest().m_nThreadPushCount;
Value v;
v.nWriterNo = m_nThreadNo;
v.nNo = 0;
m_nPushFailed = 0;
m_fTime = m_Timer.duration();
while (v.nNo < nPushCount) {
if (m_Queue.push(v)) {
++v.nNo;
}
else
++m_nPushFailed;
}
m_fTime = m_Timer.duration() - m_fTime;
getTest().m_nWriterDone.fetch_add(1);
}
};
template <class Queue>
class ReaderThread: public CppUnitMini::TestThread
{
public:
Queue& m_Queue;
double m_fTime;
size_t m_nPopEmpty;
size_t m_nPopped;
size_t m_nBadWriter;
typedef std::vector<size_t> TPoppedData;
std::vector<TPoppedData> m_WriterData;
virtual void test()
{
m_nPopEmpty = 0;
m_nPopped = 0;
m_nBadWriter = 0;
const size_t nTotalWriters = s_nWriterThreadCount;
Value v;
m_fTime = m_Timer.duration();
while (true) {
if (m_Queue.pop(v)) {
++m_nPopped;
if (/*v.nWriterNo >= 0 &&*/ v.nWriterNo < nTotalWriters)
m_WriterData[ v.nWriterNo ].push_back(v.nNo);
else
++m_nBadWriter;
}
else
++m_nPopEmpty;
if (m_Queue.empty()) {
if (getTest().m_nWriterDone.load() >= nTotalWriters) {
CPPUNIT_MSG(" Item count: " << m_Queue.size());
if (m_Queue.empty())
break;
}
}
}
m_fTime = m_Timer.duration() - m_fTime;
}
};
protected:
size_t m_nThreadPushCount;
atomics::atomic<size_t> m_nWriterDone;
protected:
template <class Queue>
void analyze(CppUnitMini::ThreadPool& pool, Queue& testQueue, size_t /*nLeftOffset*/ = 0, size_t nRightOffset = 0 )
{
typedef ReaderThread<Queue> Reader;
typedef WriterThread<Queue> Writer;
size_t nPostTestPops = 0;
{
Value v;
while (testQueue.pop(v))
++nPostTestPops;
}
CPPUNIT_MSG(" Post pops: " << nPostTestPops);
double fTimeWriter = 0;
double fTimeReader = 0;
size_t nTotalPops = 0;
size_t nPopFalse = 0;
size_t nPoppedItems = 0;
size_t nPushFailed = 0;
std::vector< Reader * > arrReaders;
for (CppUnitMini::ThreadPool::iterator it = pool.begin(); it != pool.end(); ++it) {
Reader * pReader = dynamic_cast<Reader *>(*it);
if (pReader) {
fTimeReader += pReader->m_fTime;
nTotalPops += pReader->m_nPopped;
nPopFalse += pReader->m_nPopEmpty;
arrReaders.push_back(pReader);
CPPUNIT_CHECK_EX(pReader->m_nBadWriter == 0, "reader " << pReader->m_nThreadNo << " bad writer event count=" << pReader->m_nBadWriter);
size_t nPopped = 0;
for (size_t n = 0; n < s_nWriterThreadCount; ++n)
nPopped += pReader->m_WriterData[n].size();
CPPUNIT_MSG(" Reader " << pReader->m_nThreadNo - s_nWriterThreadCount << " popped count=" << nPopped);
nPoppedItems += nPopped;
}
else {
Writer * pWriter = dynamic_cast<Writer *>(*it);
CPPUNIT_ASSERT(pWriter != nullptr);
fTimeWriter += pWriter->m_fTime;
nPushFailed += pWriter->m_nPushFailed;
if (!boost::is_base_of<cds::bounded_container, Queue>::value) {
CPPUNIT_CHECK_EX(pWriter->m_nPushFailed == 0,
"writer " << pWriter->m_nThreadNo << " push failed count=" << pWriter->m_nPushFailed);
}
}
}
CPPUNIT_CHECK_EX(nTotalPops == nPoppedItems, "nTotalPops=" << nTotalPops << ", nPoppedItems=" << nPoppedItems);
CPPUNIT_MSG(" Readers: duration=" << fTimeReader/s_nReaderThreadCount << ", success pop=" << nTotalPops << ", failed pops=" << nPopFalse);
CPPUNIT_MSG(" Writers: duration=" << fTimeWriter/s_nWriterThreadCount << ", failed push=" << nPushFailed);
size_t nQueueSize = m_nThreadPushCount * s_nWriterThreadCount;
CPPUNIT_CHECK_EX(nTotalPops + nPostTestPops == nQueueSize, "popped=" << nTotalPops + nPostTestPops << " must be " << nQueueSize);
CPPUNIT_CHECK(testQueue.empty());
}
template <class Queue>
void test()
{
m_nThreadPushCount = s_nQueueSize/s_nWriterThreadCount;
CPPUNIT_MSG(" reader count=" << s_nReaderThreadCount << " writer count=" << s_nWriterThreadCount
<< " item count=" << m_nThreadPushCount * s_nWriterThreadCount << "...");
Queue testQueue;
CppUnitMini::ThreadPool pool(*this);
m_nWriterDone.store(0);
// Writers must be first
pool.add(new WriterThread<Queue>(pool, testQueue), s_nWriterThreadCount);
pool.add(new ReaderThread<Queue>(pool, testQueue), s_nReaderThreadCount);
pool.run();
analyze(pool, testQueue);
CPPUNIT_MSG(testQueue.statistics());
}
Спасибо за ваш ответ!Как уже говорилось выше утечки памяти может быть устранена путем добавления деструктор узла: 'структура NODE_TYPE { \t \t \t ~ NODE_TYPE() { \t \t \t \t удалить m_value.load(); \t \t \t} }; 'Но в этом случае утечки могут мне помочь, они показывают, что некоторые узлы были удалены из-за многопоточного взаимодействия. Но я могу проследить, почему это может быть. – FatherOctober
'old_next = new_next', чтобы установить новый хвост, поскольку предыдущий имеет значение m_value! = NULL, а m_next - NULL. Эта ситуация может быть связана с двумя потоками для push(), но один из них выполняет оператор if(). Таким образом, другой поток может выполнять оператор 'old_tail.ptr-> m_next.compare_exchange_strong (old_next, new_next)' in else() быстрее, чем один. – FatherOctober