2012-03-22 3 views
1

Предисловие: я новичок в многопоточном программировании и немного ржавый с C++. Мои требования - использовать один мьютекс и два условия: mNotEmpty и mEmpty. Я также должен создавать и заполнять векторы так, как указано ниже.Один продюсер, два потребителя, выступающий в одной очереди, произведенный продюсером

У меня есть одна продюсерская нить, создающая вектор случайных чисел размером n*2 и два пользователя, вставляющих эти значения в два отдельных вектора размера n.

Я делаю следующее в производителе:

  1. блокировки мьютекс: pthread_mutex_lock(&mMutex1)
  2. Подожди потребитель сказать вектор пуст: pthread_cond_wait(&mEmpty,&mMutex1)
  3. оттеснить значение в вектор
  4. Сигнал потребителю о том, что вектор больше не пуст: pthread_cond_signal(&mNotEmpty)
  5. Разблокировать мьютексы: pthread_mutex_unlock(&mMutex1)
  6. Вернуться к шагу 1

В потребителе:

  1. Замка мьютекс: pthread_mutex_lock(&mMutex1)
  2. Проверьте, чтобы увидеть, если вектор пуст, и если да сигнализировать производитель: pthread_cond_signal(&mEmpty)
  3. Повторное добавление значения в один из двух новых векторов (в зависимости от того, какой поток) и удалить из исходного вектора
  4. Разблокировать мьютексы: pthread_mutex_unlock(&mMutex1)
  5. Вернуться к шагу 1

Что случилось с моим процессом? Я продолжаю получать ошибки сегментации или бесконечные циклы.

Edit: Вот код:

void Producer() 
{ 
    srand(time(NULL)); 
    for(unsigned int i = 0; i < mTotalNumberOfValues; i++){ 
     pthread_mutex_lock(&mMutex1); 

     pthread_cond_wait(&mEmpty,&mMutex1); 
     mGeneratedNumber.push_back((rand() % 100) + 1); 

     pthread_cond_signal(&mNotEmpty); 
     pthread_mutex_unlock(&mMutex1); 
    } 
} 

void Consumer(const unsigned int index) 
{ 
    for(unsigned int i = 0; i < mNumberOfValuesPerVector; i++){ 
     pthread_mutex_lock(&mMutex1); 
     if(mGeneratedNumber.empty()){ 
      pthread_cond_signal(&mEmpty); 
     }else{ 
      mThreadVector.at(index).push_back[mGeneratedNumber.at(0)]; 
      mGeneratedNumber.pop_back(); 
     } 

     pthread_mutex_unlock(&mMutex1); 
    } 
} 
+1

Это было бы намного легче ответить, если бы вы представили минимальную компилируемую программу, которая продемонстрировала вашу проблему. – Mankarse

+0

Ну, если вы ждете внутри замка, тогда ожидайте тупика где-нибудь в какой-то момент в будущем. – ActiveTrayPrntrTagDataStrDrvr

+0

Шаг 4 производителя говорит, чтобы сигнализировать потребителю с mNotEmpty, но mNotEmpty не используется потребителем. –

ответ

4

Я не уверен, я понимаю, обоснование пути вы делаете вещи. В обычной идиоме потребительского провайдера провайдер подталкивает как множество возможных элементов в канал, ожидая, только если есть недостаточно места в канале; он не ждет пустой. Так обычно идиома будет:

поставщик (нажать один элемент):

pthread_mutex_lock(&mutex); 
while (! spaceAvailable()) { 
    pthread_cond_wait(&spaceAvailableCondition, &mutex); 
} 
pushTheItem(); 
pthread_cond_signal(&itemAvailableCondition); 
pthread_mutex_unlock(&mutex); 

и на стороне потребителя, чтобы получить деталь:

pthread_mutex_lock(&mutex); 
while (! itemAvailable()) { 
    pthread_cond_wait(&itemAvailableCondition, &mutex); 
} 
getTheItem(); 
pthread_cond_signal(&spaceAvailableCondition); 
pthread_mutex_unlock(&mutex); 

Обратите внимание, что для каждого условия , одна сторона сигнализирует, а другая ждет. (I не видят ожидания у вашего потребителя.) И если есть более одного процесса с обеих сторон, я бы рекомендовал использовать pthread_cond_broadcast, , а не pthread_cond_signal.

В коде есть ряд других проблем. Некоторые из них выглядят более как опечатки: вы должны скопировать/вставить фактический код, чтобы этого избежать. У вас действительно означает читать и выталкивать mGeneratedValues, когда вы нажимаете на mGeneratedNumber и проверяете, что это пусто? (Если у вас действительно есть , у вас есть две разные очереди, то вы выходите из очереди, где нет , который нажал.) И у вас нет циклов, ожидающих условий ; вы продолжаете повторяться через количество элементов, которые вы ожидаете от (увеличивая счетчик каждый раз, так что вы, вероятно, будете gerninate задолго до того, как вы должны) — Я не вижу бесконечного цикла , но я с легкостью вижу бесконечное ожидание в pthread_cond_wait в производитель. Я не вижу основной свалки, но что происходит, когда один процессов завершается (вероятно, потребитель, потому что он никогда не ничего не ждет); если он завершит уничтожение мьютекса или переменных , вы можете получить дамп ядра, когда другой процесс пытается использовать .

0

Вы могли бы рассмотреть вопрос о принятии семафор только после того, как условие выполняется, например,

producer() 
{ 
    while true 
    { 
     waitForEmpty(); 
     takeMutex(); 
     produce(); 
     releaseMutex(); 
    } 
} 

consumer() 
{ 
    while true 
    { 
     waitForNotEmpty(); 
     takeMutex(); 
     consume(); 
     releaseMutex(); 
    } 
} 
+0

Это то, о чем я и подумал, но я посмотрел pthread_cond_wait, и он ожидает, что будет передан заблокированный мьютекс, который он разблокирует, прежде чем он начнет ждать. –

+0

-1 потому что это просто неправильно. Вы не можете 'waitForEmpty' без использования мьютекса, и вы не можете освободить мьютекс между' waitForEmpty' и 'производить', не рискуя условиями гонки. –

+0

Извините, вы правы. Подумав об этом, я подумал, что делать это, как я писал, может привести к состоянию гонки, если, скажем, два потребителя закончены, только один принимает мьютекс и опустошает вектор, а другой входит в защищенный раздел. В любом случае, вот хороший рабочий пример того, что вы пытаетесь сделать: http://www.cs.loyola.edu/~jglenn/702/S2008/Examples/ProducerConsumer/pc_cpp.html –

1

В производителю позвоните pthread_cond_wait, только когда очередь не пуста. В противном случае вы будете заблокированы навсегда из-за состояния гонки.

0
Here is a solution to a similar problem like you. In this program producer produces a no and writes it to a array(buffer) and a maintains a file then update a status(status array) about it, while on getting data in the array(buffer) consumers start to consume(read and write to their file) and update a status that it has consumed. when producer looks that both the consumer has consumed the data it overrides the data with a new value and goes on. for convenience here i have restricted the code to run for 2000 nos. 

// Producer-consumer // 

#include <iostream> 
#include <fstream> 
#include <pthread.h> 

#define MAX 100 

using namespace std; 

int dataCount = 2000; 

int buffer_g[100]; 
int status_g[100]; 

void *producerFun(void *); 
void *consumerFun1(void *); 
void *consumerFun2(void *); 

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; 

pthread_cond_t dataNotProduced = PTHREAD_COND_INITIALIZER; 
pthread_cond_t dataNotConsumed = PTHREAD_COND_INITIALIZER; 

int main() 
{ 

    for(int i = 0; i < MAX; i++) 
    status_g[i] = 0; 

    pthread_t producerThread, consumerThread1, consumerThread2; 

    int retProducer = pthread_create(&producerThread, NULL, producerFun, NULL); 
    int retConsumer1 = pthread_create(&consumerThread1, NULL, consumerFun1, NULL); 
    int retConsumer2 = pthread_create(&consumerThread2, NULL, consumerFun2, NULL); 

    pthread_join(producerThread, NULL); 
    pthread_join(consumerThread1, NULL); 
    pthread_join(consumerThread2, NULL); 

    return 0; 

} 

void *producerFun(void *) 
{ 
    //file to write produced data by producer 
    const char *producerFileName = "producer.txt"; 
    ofstream producerFile(producerFileName); 
    int index = 0, producerCount = 0; 

    while(1) 
    { 
     pthread_mutex_lock(&mutex); 

     if(index == MAX) 
     { 
     index = 0; 
     } 
     if(status_g[index] == 0) 
     { 

     static int data = 0; 
     data++; 

     cout << "Produced: " << data << endl; 

     buffer_g[index] = data; 

     producerFile << data << endl; 

     status_g[index] = 5; 

     index ++; 
     producerCount ++; 

     pthread_cond_broadcast(&dataNotProduced); 
     } 
     else 
     { 
     cout << ">> Producer is in wait.." << endl; 
     pthread_cond_wait(&dataNotConsumed, &mutex); 
     } 
     pthread_mutex_unlock(&mutex); 

     if(producerCount == dataCount) 
     { 
     producerFile.close(); 
     return NULL; 
     } 
    } 
} 

void *consumerFun1(void *) 
{ 
    const char *consumerFileName = "consumer1.txt"; 
    ofstream consumerFile(consumerFileName); 
    int index = 0, consumerCount = 0; 

    while(1) 
    { 
    pthread_mutex_lock(&mutex); 
    if(index == MAX) 
    { 
     index = 0; 
    } 

    if(status_g[index] != 0 && status_g[index] != 2) 
    { 
     int data = buffer_g[index]; 

     cout << "Cosumer1 consumed: " << data << endl; 
     consumerFile << data << endl; 

     status_g[index] -= 3; 

     index ++; 
     consumerCount ++; 

     pthread_cond_signal(&dataNotConsumed); 
    } 
    else 
    { 
     cout << "Consumer1 is in wait.." << endl; 
     pthread_cond_wait(&dataNotProduced, &mutex); 
    } 
     pthread_mutex_unlock(&mutex); 
    if(consumerCount == dataCount) 
    { 
     consumerFile.close(); 
     return NULL; 
    } 
    } 
} 

void *consumerFun2(void *) 
{ 
    const char *consumerFileName = "consumer2.txt"; 
    ofstream consumerFile(consumerFileName); 
    int index = 0, consumerCount = 0; 

    while(1) 
    { 
    pthread_mutex_lock(&mutex); 
    if(index == MAX) 
    { 
     index = 0; 
    } 

    if(status_g[index] != 0 && status_g[index] != 3) 
    { 


     int data = buffer_g[index]; 
     cout << "Consumer2 consumed: " << data << endl; 
     consumerFile << data << endl; 

     status_g[index] -= 2; 

     index ++; 
     consumerCount ++; 

     pthread_cond_signal(&dataNotConsumed); 
    } 
    else 
    { 
     cout << ">> Consumer2 is in wait.." << endl; 
     pthread_cond_wait(&dataNotProduced, &mutex); 
    } 
    pthread_mutex_unlock(&mutex); 

    if(consumerCount == dataCount) 
    { 
     consumerFile.close(); 
     return NULL; 
    } 
    } 
} 

Here is only one problem that producer in not independent to produce, that is it needs to take lock on the whole array(buffer) before it produces new data, and if the mutex is locked by consumer it waits for that and vice versa, i am trying to look for it.