2015-09-21 5 views
3

У меня есть блокирующая многопользовательская очередь, отдельная потребительская очередь, основанная на круговом буфере. До сих пор он имел только неблокирующие звонки push_back() и pop_front(). Теперь я хочу добавить блокирующие версии этих вызовов, но я хочу свести к минимуму влияние этого на производительность кода, использующего неблокирующие версии, а именно: он не должен превращать их в «lock-by-default" звонки.Добавление функций блокировки в незаблокированную очередь

E.g. простейший вариант блокирующего push_back() будет выглядеть следующим образом:

void push_back_Blocking(const T& pkg) { 
    if (!push_back(pkg)) { 
     unique_lock<mutex> ul(mux); 
     while (!push_back(pkg)) { 
      cv_notFull.wait(ul); 
     } 
    } 
} 

но, к сожалению, это также потребует поставить следующий блок в конце «неблокирующая» pop_front():

{ 
    std::lock_guard<mutex> lg(mux); 
    cv_notFull.notify_all(); 
} 

В то время как только notify практически не влияет на производительность (если нить не ждет), замок имеет.

Так что мой вопрос:
Как я могу (с использованием стандартного C++ 14, если это возможно) добавить блокирование push_back и pop_front функции-члены в моей очереди, не сильно препятствует производительность non_blocking коллег (читай: минимизировать системные вызовы)? По крайней мере, до тех пор, пока нить не заблокирована - но в идеале даже тогда.


Для справки, моя текущая версия выглядит примерно так (я ушел из отладочные проверки, выравнивание данных и явные упорядоченности памяти):

template<class T, size_t N> 
class MPSC_queue { 
    using INDEX_TYPE = unsigned long; 
    struct Idx { 
     INDEX_TYPE idx; 
     INDEX_TYPE version_cnt; 
    }; 
    enum class SlotState { 
     EMPTY, 
     FILLED 
    }; 
    struct Slot { 
     Slot() = default;    
     std::atomic<SlotState> state= SlotState::EMPTY; 
     T data{}; 
    }; 
    struct Buffer_t { 
     std::array<Slot, N> data{}; 
     Buffer_t() { 
      data.fill(Slot{ SlotState::EMPTY, T{} }); 
     } 
     Slot& operator[](Idx idx) { 
      return this->operator[](idx.idx); 
     } 
     Slot& operator[](INDEX_TYPE idx) { 
      return data[idx];     
     } 
    }; 

    Buffer_t buffer; 
    std::atomic<Idx> head{}; 
    std::atomic<INDEX_TYPE> tail=0; 

    INDEX_TYPE next(INDEX_TYPE old) { return (old + 1) % N; } 

    Idx next(Idx old) { 
     old.idx = next(old.idx); 
     old.version_cnt++; 
     return old; 
    } 
public:  
    bool push_back(const T& val) { 
     auto tHead = head.load(); 
     Idx wrtIdx; 
     do { 
      wrtIdx = next(tHead); 
      if (wrtIdx.idx == tail) { 
       return false; 
      } 
     } while (!head.compare_exchange_strong(tHead, wrtIdx)); 

     buffer[wrtIdx].data = val; 
     buffer[wrtIdx].state = SlotState::FILLED; 
     return true; 
    } 

    bool pop_front(T& val) {     
     auto rIdx = next(tail); 
     if (buffer[rIdx].state != SlotState::FILLED) { 
      return false; 
     } 
     val = buffer[rIdx].data; 
     buffer[rIdx].state = SlotState::EMPTY; 
     tail = rIdx; 
     return true; 
    } 
}; 

Похожие вопросы:

Я задал аналогичный вопрос конкретно об оптимизации использования condition_variable::notifyhere, но вопрос закрылся как su дублируется this question.
Я не согласен, потому что этот вопрос был связан с тем, почему мьютекс необходим для переменных условий вообще (или, скорее, это эквивалент pthread) - фокусировка на condition_variable::wait - и не может/как его можно избежать для части notify. Но, видимо, я не сделал этого достаточно ясным (или люди просто не согласились с моим мнением).

В любом случае ответы в связанном с нами вопросе не помогли мне, и так как это в целом было XY-problem, я решил задать еще один вопрос о фактической проблеме, которую я имею, и, таким образом, предоставить более широкий спектр возможных решений (возможно, есть способ избежать переменных состояния вообще).

This question также очень похож, но

  1. Речь идет о C на Linux и ответы используют специфичную для платформы конструкции (PThreads и futexes)
  2. Автор там попросил ЭФФЕКТИВНЫЙ блокирующие вызовы, но не неблокирующих вообще. Я, с другой стороны, не забочусь об эффективности блокирующих, но хочу как можно быстрее сохранить неблокирующие.
+0

Неверная ссылка на этот вопрос. Не могли бы вы исправить это? Кроме того, атомарные операции над элементом 'head' на самом деле не являются * блокируемыми * из-за его размера (2 * unsigned long). – Tsyvarev

+0

@ Цыварев: Спасибо, я исправил ссылки. 'std :: atomic ' IS lock-free на VS2015 (x64) (просто проверьте 'std :: atomic {} .is_lock_free()'), и насколько я знаю, gcc и clang могут сделать это даже для 128 бит (две переменные 'size_t') – MikeMB

+0

Ох, только что обнаружил, что современный x86_64 поддерживает двойной CAS. Неважно. – Tsyvarev

ответ

2

Если есть потенциал официанта на условной переменной, вы имеют заблокировать мьютекс для notify_all вызова.

Дело в том, что проверка состояния (!push_back(pkg)) выполняется перед тем ожидания на состояние переменной (C++ 11 не предусматривает другого способа). Таким образом, мьютекс - это единственное средство, которое может гарантировать стабильность между этими действиями.

Но можно исключить блокировку (и уведомление) в случае, если потенциальный официант не задействован. Просто используйте additinal флаг:

class MPSC_queue { 
    ... // Original definitions 
    std::atomic<bool> has_waiters; 

public: 
    void push_back_Blocking(const T& pkg) { 
     if (!push_back(pkg)) { 
      unique_lock<mutex> ul(mux); 
      has_waiters.store(true, std::memory_order_relaxed); // #1 
      while (!push_back(pkg)) { // #2 inside push_back() method 
       cv_notFull.wait(ul); 
       // Other waiter may clean flag while we wait. Set it again. Same as #1. 
       has_waiters.store(true, std::memory_order_relaxed); 
      } 
      has_waiters.store(false, std::memory_order_relaxed); 
     } 
    } 

    // Method is same as original, exposed only for #2 mark. 
    bool push_back(const T& val) { 
     auto tHead = head.load(); 
     Idx wrtIdx; 
     do { 
      wrtIdx = next(tHead); 
      if (wrtIdx.idx == tail) { // #2 
       return false; 
      } 
     } while (!head.compare_exchange_strong(tHead, wrtIdx)); 

     buffer[wrtIdx].data = val; 
     buffer[wrtIdx].state = SlotState::FILLED; 
     return true; 
    } 

    bool pop_front(T& val) { 
     // Main work, same as original pop_front, exposed only for #3 mark. 
     auto rIdx = next(tail); 
     if (buffer[rIdx].state != SlotState::FILLED) { 
      return false; 
     } 
     val = buffer[rIdx].data; 
     buffer[rIdx].state = SlotState::EMPTY; 
     tail = rIdx; // #3 

     // Notification part 
     if(has_waiters.load(std::memory_order_relaxed)) // #4 
     { 
      // There are potential waiters. Need to lock. 
      std::lock_guard<mutex> lg(mux); 
      cv_notFull.notify_all(); 
     } 

     return true; 
    } 
}; 

Ключевые отношения здесь являются:

  1. Установка флага на #1 и чтение tail для проверки состояния в #2.
  2. Хранение tail по телефону #3 и флаг проверки по адресу #4.

Оба эти отношения должны предоставить какой-то универсальный заказ. То есть #1 следует наблюдать до #2 даже другими разделами. То же самое для #3 и #4.

В этом случае один может гарантировать, что, если проверка флагу #4 обнаружил, что не установлено, то возможно futher проверка состояния #2 будет найдена эффект изменения состояния #3. Поэтому безопасно не блокировать (и уведомлять), потому что официант невозможен.

В текущей реализации универсального порядка между #1 и #2 обеспечивается путем загрузки tail с неявным memory_order_seq_cst. Тот же порядок между #3 и #4 предоставляется путем хранения tail с неявным memory_order_seq_cst.

В этом подходе «Не блокировать, если нет официантов», универсальный заказ - самая сложная часть. В обоих отношениях это Читать после записи заказ, которого не может быть достигнуто ни с одной комбинацией memory_order_acquire и memory_order_release. Таким образом, следует использовать memory_order_seq_cst.

+0

Я не понимал, вы могли бы свести к минимуму количество барьеров памяти, что очень много! Большое спасибо! Однако, я думаю, 'has_waiters' должен быть счетчиком вместо bool, потому что в настоящее время - если я чего-то не упускаю, вы безоговорочно устанавливаете флаг в false, даже если потенциально могут быть другие потоки. – MikeMB

+0

Возможно, использование счетчика вместо флага было бы более естественным. Но здесь 'notify_all' пробудит * всех * официантов. И после пробуждения, официант переворачивает флаг и снова проверяет условие (см. Описание строки после '.wait()' call). – Tsyvarev

+0

Правильно, я этого не видел - хотя я бы предпочел вместо этого использовать счетчик и 'notify_one', я должен сделать некоторые измерения. Btw: Я только что понял, в моей очереди есть серьезная ошибка: «Slot :: state» не является атомарным! :( – MikeMB

 Смежные вопросы

  • Нет связанных вопросов^_^