У меня есть блокирующая многопользовательская очередь, отдельная потребительская очередь, основанная на круговом буфере. До сих пор он имел только неблокирующие звонки push_back()
и pop_front()
. Теперь я хочу добавить блокирующие версии этих вызовов, но я хочу свести к минимуму влияние этого на производительность кода, использующего неблокирующие версии, а именно: он не должен превращать их в «lock-by-default" звонки.Добавление функций блокировки в незаблокированную очередь
E.g. простейший вариант блокирующего push_back() будет выглядеть следующим образом:
void push_back_Blocking(const T& pkg) {
if (!push_back(pkg)) {
unique_lock<mutex> ul(mux);
while (!push_back(pkg)) {
cv_notFull.wait(ul);
}
}
}
но, к сожалению, это также потребует поставить следующий блок в конце «неблокирующая» pop_front()
:
{
std::lock_guard<mutex> lg(mux);
cv_notFull.notify_all();
}
В то время как только notify
практически не влияет на производительность (если нить не ждет), замок имеет.
Так что мой вопрос:
Как я могу (с использованием стандартного C++ 14, если это возможно) добавить блокирование push_back
и pop_front
функции-члены в моей очереди, не сильно препятствует производительность non_blocking коллег (читай: минимизировать системные вызовы)? По крайней мере, до тех пор, пока нить не заблокирована - но в идеале даже тогда.
Для справки, моя текущая версия выглядит примерно так (я ушел из отладочные проверки, выравнивание данных и явные упорядоченности памяти):
template<class T, size_t N>
class MPSC_queue {
using INDEX_TYPE = unsigned long;
struct Idx {
INDEX_TYPE idx;
INDEX_TYPE version_cnt;
};
enum class SlotState {
EMPTY,
FILLED
};
struct Slot {
Slot() = default;
std::atomic<SlotState> state= SlotState::EMPTY;
T data{};
};
struct Buffer_t {
std::array<Slot, N> data{};
Buffer_t() {
data.fill(Slot{ SlotState::EMPTY, T{} });
}
Slot& operator[](Idx idx) {
return this->operator[](idx.idx);
}
Slot& operator[](INDEX_TYPE idx) {
return data[idx];
}
};
Buffer_t buffer;
std::atomic<Idx> head{};
std::atomic<INDEX_TYPE> tail=0;
INDEX_TYPE next(INDEX_TYPE old) { return (old + 1) % N; }
Idx next(Idx old) {
old.idx = next(old.idx);
old.version_cnt++;
return old;
}
public:
bool push_back(const T& val) {
auto tHead = head.load();
Idx wrtIdx;
do {
wrtIdx = next(tHead);
if (wrtIdx.idx == tail) {
return false;
}
} while (!head.compare_exchange_strong(tHead, wrtIdx));
buffer[wrtIdx].data = val;
buffer[wrtIdx].state = SlotState::FILLED;
return true;
}
bool pop_front(T& val) {
auto rIdx = next(tail);
if (buffer[rIdx].state != SlotState::FILLED) {
return false;
}
val = buffer[rIdx].data;
buffer[rIdx].state = SlotState::EMPTY;
tail = rIdx;
return true;
}
};
Похожие вопросы:
Я задал аналогичный вопрос конкретно об оптимизации использования condition_variable::notify
here, но вопрос закрылся как su дублируется this question.
Я не согласен, потому что этот вопрос был связан с тем, почему мьютекс необходим для переменных условий вообще (или, скорее, это эквивалент pthread) - фокусировка на condition_variable::wait
- и не может/как его можно избежать для части notify
. Но, видимо, я не сделал этого достаточно ясным (или люди просто не согласились с моим мнением).
В любом случае ответы в связанном с нами вопросе не помогли мне, и так как это в целом было XY-problem, я решил задать еще один вопрос о фактической проблеме, которую я имею, и, таким образом, предоставить более широкий спектр возможных решений (возможно, есть способ избежать переменных состояния вообще).
This question также очень похож, но
- Речь идет о C на Linux и ответы используют специфичную для платформы конструкции (PThreads и futexes)
- Автор там попросил ЭФФЕКТИВНЫЙ блокирующие вызовы, но не неблокирующих вообще. Я, с другой стороны, не забочусь об эффективности блокирующих, но хочу как можно быстрее сохранить неблокирующие.
Неверная ссылка на этот вопрос. Не могли бы вы исправить это? Кроме того, атомарные операции над элементом 'head' на самом деле не являются * блокируемыми * из-за его размера (2 * unsigned long). – Tsyvarev
@ Цыварев: Спасибо, я исправил ссылки. 'std :: atomic' IS lock-free на VS2015 (x64) (просто проверьте 'std :: atomic {} .is_lock_free()'), и насколько я знаю, gcc и clang могут сделать это даже для 128 бит (две переменные 'size_t') –
MikeMB
Ох, только что обнаружил, что современный x86_64 поддерживает двойной CAS. Неважно. – Tsyvarev