2009-02-12 8 views
2

Я пытаюсь написать потокобезопасную очередь, используя pthreads в C++. Моя программа работает в 93% случаев. Другие 7% времени, когда другие выливают мусор, ИЛИ, похоже, засыпают. Мне интересно, есть ли какая-то ошибка в моей очереди, где контекст-коммутатор может ее сломать?Проблема с потоковой безопасностью?

// thread-safe queue 
// inspired by http://msmvps.com/blogs/vandooren/archive/2007/01/05/creating-a-thread-safe-producer-consumer-queue-in-c-without-using-locks.aspx 
// only works with one producer and one consumer 
#include <pthread.h> 
#include <exception> 

template<class T> 
class tsqueue 
{ 
    private: 
     volatile int m_ReadIndex, m_WriteIndex; 
     volatile T *m_Data; 
     volatile bool m_Done; 
     const int m_Size; 
     pthread_mutex_t m_ReadMutex, m_WriteMutex; 
     pthread_cond_t m_ReadCond, m_WriteCond; 
    public: 
     tsqueue(const int &size); 
     ~tsqueue(); 
     void push(const T &elem); 
     T pop(); 
     void terminate(); 
     bool isDone() const; 
}; 

template <class T> 
tsqueue<T>::tsqueue(const int &size) : m_ReadIndex(0), m_WriteIndex(0), m_Size(size), m_Done(false) { 
    m_Data = new T[size]; 
    pthread_mutex_init(&m_ReadMutex, NULL); 
    pthread_mutex_init(&m_WriteMutex, NULL); 
    pthread_cond_init(&m_WriteCond, NULL); 
    pthread_cond_init(&m_WriteCond, NULL); 
} 

template <class T> 
tsqueue<T>::~tsqueue() { 
    delete[] m_Data; 
    pthread_mutex_destroy(&m_ReadMutex); 
    pthread_mutex_destroy(&m_WriteMutex); 
    pthread_cond_destroy(&m_ReadCond); 
    pthread_cond_destroy(&m_WriteCond); 
} 


template <class T> 
void tsqueue<T>::push(const T &elem) { 
    int next = (m_WriteIndex + 1) % m_Size; 
    if(next == m_ReadIndex) { 
     pthread_mutex_lock(&m_WriteMutex); 
     pthread_cond_wait(&m_WriteCond, &m_WriteMutex); 
     pthread_mutex_unlock(&m_WriteMutex); 
    } 
    m_Data[m_WriteIndex] = elem; 
    m_WriteIndex = next; 
    pthread_cond_signal(&m_ReadCond); 
} 

template <class T> 
T tsqueue<T>::pop() { 
    if(m_ReadIndex == m_WriteIndex) { 
     pthread_mutex_lock(&m_ReadMutex); 
     pthread_cond_wait(&m_ReadCond, &m_ReadMutex); 
     pthread_mutex_unlock(&m_ReadMutex); 
     if(m_Done && m_ReadIndex == m_WriteIndex) throw "queue empty and terminated"; 
    } 
    int next = (m_ReadIndex +1) % m_Size; 
    T elem = m_Data[m_ReadIndex]; 
    m_ReadIndex = next; 
    pthread_cond_signal(&m_WriteCond); 
    return elem; 
} 

template <class T> 
void tsqueue<T>::terminate() { 
    m_Done = true; 
    pthread_cond_signal(&m_ReadCond); 
} 

template <class T> 
bool tsqueue<T>::isDone() const { 
    return (m_Done && m_ReadIndex == m_WriteIndex); 
} 

Это может быть использовано, как это:

// thread 1 
while(cin.get(c)) { 
    queue1.push(c); 
} 
queue1.terminate(); 


// thread 2 
while(!queue1.isDone()) { 
    try{ c = queue1.pop(); } 
    catch(char const* str){break;} 
    cout.put(c); 
} 

Если кто-то видит проблемы с этим, пожалуйста, так сказать :)

+0

MSVC компилятор? Вы работаете на многоядерной машине? – RobS

+0

Вы действительно измерили точные коэффициенты 93%/7%? ЛОЛ. –

+0

Rob - gcc. Я * думаю * одноядерный, но я запускаю его на школьном компьютере по PuTTy, поэтому я не совсем уверен. Augusto - Ran it 32 раза, он провалился дважды. Близкое приближение :) Думаю, я должен был округлить до 94% – mpen

ответ

1

Если вы хотите что-нибудь с достойной производительностью, я бы настоятельно рекомендую демпинг вашего R/W замок и просто использовать очень простой спинлок. Или, если вы действительно думаете, что можете получить производительность, которую хотите с блокировкой R/W, я бы бросил ваш собственный на основе дизайна this (одно слово R/W Spinlock) от Джо Даффи.

+0

А, спасибо. Я дам это прочитать! Я все еще новичок в многопоточности. – mpen

3

Если это ваш фактический код, одна проблема сразу же выключить bat состоит в том, что вы инициализируете m_WriteCond дважды, а не инициализируете m_ReadCond вообще.

+0

О, ничего себе, полностью пропустил это. Я удивлен, что эта штука не взорвалась. Спасибо :) – mpen

8

Да, есть определенно проблемы здесь. Все ваши обращения к переменным-членам очереди встречаются за пределами мьютексы. На самом деле, я не совсем уверен, что ваши мьютексы защищают, так как они просто ждут переменной условия.

Кроме того, оказывается, что ваш читатель и писатель всегда будет работать в карцер шаг, не позволяя очереди, чтобы выйти за пределы одного элемента в размерах.

+0

Единственная причина, по которой у меня есть мьютексы, - это то, что требует их pthread_cond_wait. Поскольку у меня только один читатель или писатель за раз, никакая переменная не должна читаться или записываться одновременно (с мьютексами или без них). – mpen

+0

О, и я не уверен, что вы имеете в виду, они будут действовать в блокировке. Почему они? Разве это не произойдет, только если поток 1 и поток 2 будут работать с одинаковой скоростью и будут на отдельных процессорах? В противном случае поток 1 может выполняться несколько раз до того, как будет вызван поток 2. – mpen

0

Кажется, что проблема заключается в том, что у вас есть условие гонки, что нить 2 CAN выполнить до резьбы-никогда не делает какой-либо cin.get (с). Необходимо убедиться, что данные инициализированы и когда вы получаете информацию о том, что вы делаете что-то, если данные не были введены.

Может быть, это я не видел остальную часть кода, где это делается, хотя.

+0

Думаю, что вам что-то не хватает. если сначала вызывается 'pop()', тогда 'm_ReadIndex == m_WriteIndex' и поток 2 будут спать до тех пор, пока не будут сигнализированы. – mpen

2

Вы должны относиться к этому классу, как monitor. У вас должна быть «блокировка монитора» для каждой очереди (обычный мьютекс). Всякий раз, когда вы вводите метод, который читает или записывает любое поле в очереди, вы должны заблокировать этот мьютекс, как только вы его введете. Это предотвращает одновременное взаимодействие нескольких потоков с очередью. Вы должны освободить блокировку до того, как будете ждать состояния, и когда вы оставите метод, чтобы другие потоки могли войти. Обязательно заново закрепите замок, когда вы закончите, ожидая состояния.

+0

Кажется немного неэффективным для блокировки всей очереди, нет?Нет причин, по которым он не должен быть способен читать и писать одновременно, если это не тот же элемент. Я ценю предложение, хотя :) – mpen

+0

Вам нужно защитить все общее состояние. Оба читателя и писатели получают доступ к обоим индексам, поэтому вам действительно нужно заблокировать всю очередь. С двумя потоками это не проблема. Если у вас много читателей и писателей, посмотрите на проблему читателей-писателей для более эффективного подхода. –

 Смежные вопросы

  • Нет связанных вопросов^_^