2013-10-26 3 views
1

Я пишу безблокировочного двусвязного список, основанный на этих работах:атомарные операции для безблокировочного двусвязному список

«Эффективная и надежная блокировка свободной Рекультивация памяти на основе Reference Counting» Андерс Gidenstam, член , IEEE, Марина Papatriantafilou, ˙h акан Санделл и Philippas Tsigas

«Блокировка-двусторонние очереди бесплатно и дважды связанные списки» Хакан Санделл, Philippas Tsigas

Для этого вопроса мы можем отложить первый документ.

В этой статье они используют умный способ хранения флага удаления и указателя в слове. (Подробнее here)

Псевдо-код для этого раздела в газете:

union Link 
    : word 
    (p,d): {pointer to Node, boolean} 

structure Node 
    value: pointer to word 
    prev: union Link 
    next: union Link 

И мой код выше псевдокода:

template< typename NodeT > 
struct LockFreeLink 
{ 
public: 
    typedef NodeT NodeType; 

private: 

protected: 
    std::atomic< NodeT* > mPointer; 

public: 
    bcLockFreeLink() 
    { 
     std::atomic_init(&mPointer, nullptr); 
    } 
    ~bcLockFreeLink() {} 

    inline NodeType* getNode() const throw() 
    { 
     return std::atomic_load(&mPointer, std::memory_order_relaxed); 
    } 
    inline std::atomic< NodeT* >* getAtomicNode() const throw() 
    { 
     return &mPointer; 
    } 
}; 

struct Node : public LockFreeNode 
{ 
    struct Link : protected LockFreeLink<Node> 
    { 
     static const int dMask = 1; 
     static const int ptrMask = ~dMask; 

     Link() { } throw() 
     Link(const Node* pPointer, bcBOOL pDel = bcFALSE) throw() 
     { 
      std::atomic_init(&mPointer, (reinterpret_cast<int>(pPointer) | (int)pDel)); 
     } 

     Node* pointer() const throw() 
     { 
      return reinterpret_cast<Node*>(
       std::atomic_load(&data, std::memory_order_relaxed) & ptrMask); 
     } 
     bool del() const throw() 
     { 
      return std::atomic_load(&data, std::memory_order_relaxed) & dMask; 
     } 
     bool compareAndSwap(const Link& pExpected, const Link& pNew) throw() 
     { 
      Node* lExpected = std::atomic_load(&pExpected.mPointer, std::memory_order_relaxed); 
      Node* lNew = std::atomic_load(&pNew.mPointer, std::memory_order_relaxed); 

      return std::atomic_compare_exchange_strong_explicit(
       &mPointer, 
       &lExpected, 
       lNew, 
       std::memory_order_relaxed, 
       std::memory_order_relaxed); 
     } 

     bool operator==(const Link& pOther) throw() 
     { 
      return std::atomic_load(data, std::memory_order_relaxed) == 
       std::atomic_load(pOther.data, std::memory_order_relaxed); 
     } 
     bool operator!=(const Link& pOther) throw() 
     { 
      return !operator==(pOther); 
     } 
    }; 

    Link mPrev; 
    Link mNext; 
    Type mData; 

    Node() {}; 
    Node(const Type& pValue) : mData(pValue) {}; 
}; 

В этой статье есть эта функция для набора метка удаления ссылки ссылается на true:

procedure SetMark(link: pointer to pointer to Node) 
    while true do 
     node = *link; 
     if node.d = true or CAS(link, node, (node.p, true)) then break; 

И мой код для этой функции:

void _setMark(Link* pLink) 
{ 
    while (bcTRUE) 
    { 
     Link lOld = *pLink; 
     if(pLink->del() || pLink->compareAndSwap(lOld, Link(pLink->pointer(), bcTRUE))) 
      break; 
    } 
} 

Но моя проблема заключается в compareAndSwap функции, где я должен сравнить и поменять три атомные переменные. Информация о проблеме является here

(на самом деле new переменными сравнения и функция подкачки не важна, потому что нить местным)

Теперь моим вопрос: как я могу написать функцию, чтобы сравнить сравнение с обменом и своп три атомного varialbe или где я делаю ошибку?

(Извините за длинный вопрос)

Edit:

похожей проблема в менеджер памяти бумаге:

function CompareAndSwapRef(link:pointer to pointer toNode, 
old:pointer toNode, new:pointer toNode):boolean 
    if CAS(link,old,new) then 
     if new=NULL then 
      FAA(&new.mmref,1); 
      new.mmtrace:=false; 
    if old=NULLthen FAA(&old.mmref,-1); 
    return true; 
return false; 

здесь я должен сравнить и своп три атомных переменный. (Обратите внимание, что мои аргументы типа Link и я должен сравнить и поменять mPointer из Link)

ответ

2

Если вы не можете сделать ваши три элемента данных, которые вы сравниваете/подкачка впишется в два элемента указателя размера, вы не можете делайте это с помощью сравнения и свопинга (конечно, не на x86, и я не слышал о какой-либо другой машинной архитектуре, которая имеет такую ​​вещь).

Если вы полагаетесь на данные, хранящиеся на адресе, который (по крайней мере) выровнен по четному байтовому адресу, вы можете потенциально использовать побитовое ИЛИ, чтобы установить младший бит при удалении элемента.Раньше люди использовали верхние части адреса для хранения дополнительных данных, но по крайней мере в x86-64 это невозможно, так как верхняя часть адреса должна быть «канонической», что означает, что любые адресные биты выше «допустимого предела» (определяемого архитектурой процессора, в настоящее время это 48 бит), все должны быть такими же, как старший бит используемого предела (так же, как бит 47).

Edit: Эта часть кода делает именно то, что я описываю:

static const int dMask = 1; 
    static const int ptrMask = ~dMask; 

    Link() { } throw() 
    Link(const Node* pPointer, bcBOOL pDel = bcFALSE) throw() 
    { 
     std::atomic_init(&mPointer, (reinterpret_cast<int>(pPointer) | (int)pDel)); 
    } 

    Node* pointer() const throw() 
    { 
     return reinterpret_cast<Node*>(
      std::atomic_load(&data, std::memory_order_relaxed) & ptrMask); 
    } 

Он использует младший бит для хранения pDel флаг.

Вы должны быть в состоянии сделать это для двойного списка, используя форму cmpxchg16b (на x86). В системе Windows это будет _InterlockedCompareExchange128. В gcc (для ОС типа Unix, например Linux/MacOS) вам нужно сначала построить int128 из двух указателей. Если вы компилируете для 32-битного кода, вам, вероятно, понадобится сделать 64-битный int для ОС Windows и Unix.

+0

Я думаю, что все еще нормально использовать верхний бит, если вы маскируете логическое значение, прежде чем пытаться использовать указатель. Хотя технически это было бы неопределенное поведение. –

+0

Ваш разговор о побитом ИЛИ правилен, но, в общем, я хочу использовать CAS. – MRB

+0

@AlanStokes: это не нормально на x86-64. Все биты над «допустимыми» битами должны быть правильными значениями (такое же значение, как бит 47). Конечно, вы можете использовать высокий бит, но тогда вам нужно найти способ установить его на правильное одно или нулевое значение на основе бит 47, который имеет тенденцию становиться довольно грязным. –

2

http://www.drdobbs.com/cpp/lock-free-code-a-false-sense-of-security/210600279

Но замена замков оптом, написав свой собственный безблокировочного код является не ответ. Код без блокировки имеет два основных недостатка. Во-первых, это , не очень полезный для решения типичных проблем - множество базовых данных структур, даже двусвязных списков, до сих пор не известны без блокировки реализация. Исходя из новой или улучшенной информации, свободной от блокировки, структура по-прежнему будет зарабатывать, по крайней мере, опубликованную бумагу в журнале , а иногда и на степени.

Я не думаю, что было бы достаточно эффективно использовать его, но в любом случае это интересно читать.

+0

Благодарим вас за ссылку. – MRB

2

На x64 используются только 44 бит адресного пространства. Если ваши указатели выровнены по 8 байт, вы используете только 41 бит. 41x2 все еще слишком велико для 64 бит. Есть 128-битный сравнение и своп, хотя я не могу ручаться за его скорость. Я всегда стараюсь использовать 64-битный.

Возможно, вам нужно всего до 2 миллиардов узлов. Итак, что вы можете сделать, это предварительно выделить пул узлов, из которых извлекается список. Вы создаете узлы, захватывая следующий индекс бесплатного пула, конечно, используя атомарные операции. Затем вместо следующего и пред-указателей они могут быть 31-битными индексами в пуле узлов, и у вас осталось 2 бита для флажков удаления. Предполагая, что вам не нужны 2 миллиарда узлов, у вас осталось еще больше бит. Единственный недостаток - вы должны знать, сколько узлов вам понадобится при запуске, хотя вы могли бы перераспределить узлы, если бы вы тоже.

Что я сделал, это использовать функции виртуальной памяти для резервирования ГБ адресного пространства, а затем сопоставить физический барабан в этом пространстве, поскольку он мне нужен, чтобы расширить мой пул, не перераспределяя его.