2015-11-03 5 views
2

В настоящее время я работаю над одним незакрепленным списком в C++ 11, и у меня проблема с моей функцией popFront(). Или я должен хотя бы сказать, что знаю, что в некоторых случаях у него будут проблемы.Как предотвратить неопределенное поведение и проблему ABA в этой функции стека блокировки?

В любом случае, это то, что я в настоящее время:

std::shared_ptr<T> popFront(void) 
{ 
    auto p = atomic_load(&head); 
    while(p && !atomic_compare_exchange_weak(&head, &p, p->next)) 
    {} 
    return p ? p->data : std::shared_ptr<T>(); 
} 

Обратите внимание, что head имеет тип shared_ptr.

Однако, я предвижу несколько вопросов. Первым из них является ситуация, когда два потока выполняются popFront(), они оба читают то же самое head, и один поток заканчивается первым. Перед завершением второго потока вызывающий объект удаляет объект, на который указали, поэтому второй поток теперь работает с удаленной памятью. Вторая проблема - классическая проблема ABA.

Идея этого связанного списка состоит в том, чтобы он был заблокирован, поэтому я хочу избежать наложения блокировки внутри этой функции. К сожалению, я не уверен, как решить эти проблемы. Мы ценим любые предложения.

+0

Вместо того, фиксируя этот код, [дон 't исправить этот код] (https://www.youtube.com/watch?v=CZ69OFLlBCk), напишите лучший код. Зачем вам это делать? Даже если вы исправите все это, он не будет работать лучше, чем простой замок, и он будет хрупким, трудно понятным и невозможным изменить. –

+0

Достаточно честный. Есть две причины, почему я хотел бы это исправить: (1) Я хотел бы узнать, как изменить код для решения проблем, и (2) существующие реализации не включают необходимые мне возможности. – Mlagma

+0

Затем используйте замок. Гораздо проще писать, понимать и поддерживать. В большинстве случаев он работает лучше. И вы действительно можете убедиться, что это правильно и не будет навсегда беспокоиться о том, что вы пропустили какой-то причудливый край, в котором он может потерпеть неудачу. –

ответ

0

Существует множество решений для проектирования без блокировки очереди без проблемы с ABA.

Этот article должен предоставить пару соображений и некоторые общие инструменты для решения этих проблем можно найти here.

Теперь на описанных проблем, которые вы упомянули:

Перед второй поток завершается, вызывающий удаляет объект, который в настоящее время указал на, так что второй поток теперь работает с удаленной памяти

Да, возможно, и решение для этого состоит в использовании tagged pointers: на 32-битных архитектурах последние 2 (or more) бита не используются, поэтому их можно использовать для тегов, а на 64-битных архитектурах мы имеем по крайней мере 3 неиспользуемых бита.

Таким образом, мы можем установить, как логически удалили указатель, но не физически удалить его, установив некоторые неиспользуемые биты указателя, например:

__inline struct node* setTag(struct node* p, unsigned long TAG) 
{ 
    return (struct node*) ((uintptr_t)p | TAG); 
} 

__inline bool isTagged(struct node* p, unsigned long TAG) 
{ 
    return (uintptr_t)p == (uintptr_t)p & ~TAG; 
} 

__inline struct node* getUntaggedAddress(struct node* p, unsigned long TAG) 
{ 
    return (struct node*)((uintptr_t)p & ~TAG); 
} 

где TAG до 4 (для 32 разрядных архитектур) и до 8 на 64-битных архитектурах (2/3 или более неиспользуемых бита в зависимости от архитектуры компьютера и выравнивания слов).

Теперь, когда мы делаем CAS, мы игнорируем тегированные указатели =>, действуя только на действительных указателях.

При выполнении DEQUEUE на очереди можно выполнить следующим образом:

int dequeue(qroot* root) 
{ 
    qnode* oldHead; 

    do 
    { 
     oldHead = root->head; 

     if (isTagged(root->head)) //disregard tagged addresses 
      return NULL; 

     oldHead = getUntaggedAddress(root->head); //we do a CAS only if the old head was unchanged 

    } while (root->head.compare_exchange_strong(oldHead, oldHead->next, std::memory_order_seq_cst)); 

    return &(oldHead->data); 
} 

дал

typedef struct qnode 
{ 
    std::atomic<qnode*> next; 
    int data; 
}qnode; 

typedef struct qroot 
{ 
    std::atomic<qnode*> head; //Dequeue and peek will be performed from head 
    std::atomic<qnode*> tail; //Enqueue will be performed to tail 
}qroot; 
0

Одна вещь, которая помогает сделать нити намного проще, не освобождая память. Если вы работаете с узлом связанных узлов списка, вы можете рассмотреть их пул. Вместо освобождения узла вы возвращаете его в пул. Это касается части вашей проблемы.

ABA легко. Вам нужно вызывать счетчик каждый раз, когда вы меняете указатель на голову. Вы должны атомарно писать этот счетчик с указателем в одно и то же время. Если вы используете 32-разрядную адресацию, используйте 64-разрядное сравнение и обмен (CAS) и сохраните счетчик в дополнительных 32 битах. Если вы используете 64-разрядную адресацию, избегайте 128-битного сравнения и свопинга, потому что это может быть медленным (на наших чипах Xenon все до 64 бит работает быстро). Поскольку Windows и Linux обе не поддерживают полную 64-разрядную адресацию, вы можете избавиться от использования некоторых из 64 бит для ABA. Я использую объединения для этого как для 32 и 64-разрядных режимов адресации.

Вам не нужно учитывать каждое изменение. Вам просто нужно поймать каждое изменение. Даже когда вы пытаетесь заставить ABA с большим количеством потоков менять голову как можно быстрее, это будет редко случаться. В реальной жизни редко бывает редко. То есть вам не нужно считать очень высоким. Обычно я использую 4 бита и позволяю ему катиться. У меня могут быть проблемы. Используйте больше, если хотите.

В этом примере, я предположил, 64 бит, и использовал CAS() для сравнения и обмена, так что вам придется заменить все, что ваш компилятор использует для CAS:

typedef unsigned __int64_t U8; 

struct TNode { 
    TNode* m_pNext; 
}; 

template<class T> 
union THead { 
    struct { 
     U8 m_nABA : 4, 
      m_pNode:60; // Windows only supports 44 bits addressing anyway. 
    }; 
    U8 m_n64; // for CAS 
    // this constructor will make an atomic copy on intel 
    THead(THead& r)   { m_n64 = r.m_n64; } 
    T* Node()    { return (T*)m_pNode; } 
    // changeing Node bumps aba 
    void Node(T* p)   { m_nABA++; m_pNode = (U8)p; return this; } 
}; 

// pop pNode from head of list. 
template<class T> 
T* Pop(volatile THead<T>& Head) { 
    while (1) { // race loop 
     // Get an atomic copy of head and call it old. 
     THead<T> Old(Head); 
     if (!Old.Node()) 
      return NULL; 
     // Copy old and call it new. 
     THead<T> New(Old); 
     // change New's Node, which bumps internal aba 
     New.Node(Old.Node()->m_pNext); 
     // compare and swap New with Head if it still matches Old. 
     if (CAS(&Head.m_n64, Old.m_n64, New.m_n64)) 
      return Old.Node(); // success 
     // race, try again 
    } 
} 

// push pNode onto head of list. 
template<class T> 
void Push(volatile THead<T>& Head, T* pNode) { 
    while (1) { // race loop 
     // Get an atomic copy of head and call it old. 
     // Copy old and call it new. 
     THead<T> Old(Head), New(Old); 
     // Wire node t Head 
     pNode->m_pNext = New.Node(); 
     // change New's head ptr, which bumps internal aba 
     New.Node(pNode); 
     // compare and swap New with Head if it still matches Old. 
     if (CAS(&Head.m_n64, Old.m_n64, New.m_n64)) 
      break; // success 
     // race, try again 
    } 
} 

 Смежные вопросы

  • Нет связанных вопросов^_^