Я написал общую очередь в C, которая должна использоваться для различных типов полезных данных. Это блокирующая очередь, так что потребительские потоки блокируют ожидание того, что очередь будет заполнена потоками производителя.Блокировка потока на sem_wait вызывает зависание других потоков
Я проверил код очереди в изоляции, используя check
, включая поведение, в котором поток блокирует ожидание значения, которое будет добавлено в очередь. Тем не менее, все эти тесты прошли при интеграции очереди в остальную часть кода. Я сталкиваюсь с ситуацией, когда первый раз поток пытается заблокировать очередь, все остальные потоки висят.
Чтобы быть конкретным, программа, с которой я интегрируюсь, является членом более крупной экосистемы, поэтому есть сценарий запуска, который инициализирует программу, которая затем демонизирует. Затем демонанизированный поток создает несколько отдельных потоков для выполнения различных функций. Один из этих потоков вызывает вызов sem_wait
, и все потоки зависают, включая поток, который породил демона.
Чтобы подтвердить, что этот вызов был проблемой, я запустил программу в режиме без демона с помощью отладчика, который подтвердил, что висит sem_wait
. Я также добавил sleep
перед тем, как развернуть поток, который ждет очереди. В этом случае другие потоки продвигались дальше, а затем висели, когда был сделан вызов sem_wait
.
Очередь, о которой идет речь, доступна только для этой программы. Его ссылка хранится как глобальная переменная. Очередь, безусловно, пуста, когда выполняется вызов sem_wait
.
Ниже приведен код очереди:
//Queue.h
#include <pthread.h>
#include <semaphore.h>
typedef void (*freeFunction)(void *);
typedef struct _queueNode {
void *data;
struct _queueNode *next;
} queueNode;
typedef struct queue {
sem_t *logicalLength;
size_t elementSize;
queueNode *head;
queueNode *tail;
freeFunction freeFn;
pthread_mutex_t *queueLock;
} queue_t;
void queue_initialize(queue_t *queue, size_t elementSize, freeFunction freeFn);
void queue_destroy(queue_t *queue); // Removes all elements from the queue
int queue_size(queue_t *queue); // Returns the number of elements in the queue
void queue_add(queue_t *queue, void *element); // Adds to tail
int queue_take(queue_t *queue, void *elementBuffer); // Returns/removes head, blocks if empty
//Queue.c
#include <stdlib.h>
#include <string.h>
#include <assert.h>
#include <time.h>
#include "Queue.h"
void queue_initialize(queue_t *queue, size_t elementSize, freeFunction freeFn) {
assert(elementSize > 0);
assert(queue != NULL);
queue->elementSize = elementSize;
queue->head = NULL;
queue->tail = NULL;
queue->freeFn = freeFn;
queue->logicalLength = calloc(1, sizeof(sem_t));
queue->queueLock = calloc(1, sizeof(pthread_mutex_t));
sem_init(queue->logicalLength, 0, 0);
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
pthread_mutex_init(queue->queueLock, &attr);
}
void queue_destroy(queue_t *queue) {
assert(queue != NULL);
queueNode *current;
while(queue->head != NULL) {
current = queue->head;
queue->head = current->next;
if(queue->freeFn != NULL) {
queue->freeFn(current->data);
}
free(current->data);
free(current);
}
queue->head = NULL;
queue->tail = NULL;
pthread_mutex_destroy(queue->queueLock);
sem_destroy(queue->logicalLength);
free(queue->queueLock);
free(queue->logicalLength);
}
void queue_add(queue_t *queue, void *element) {
assert(queue != NULL);
assert(element != NULL);
pthread_mutex_lock(queue->queueLock);
queueNode *node = calloc(1, sizeof(queueNode));
node->data = calloc(1, queue->elementSize);
node->next = NULL;
memcpy(node->data, element, queue->elementSize);
if(queue->head == NULL) {
queue->head = queue->tail = node;
} else {
queue->tail->next = node;
queue->tail = node;
}
sem_post(queue->logicalLength);
pthread_mutex_unlock(queue->queueLock);
}
void queue_removeNode(queue_t *queue, void *elementBuffer) {
pthread_mutex_lock(queue->queueLock);
if(queue->head == NULL) {
pthread_mutex_unlock(queue->queueLock);
return;
}
queueNode *node = queue->head;
memcpy(elementBuffer, node->data, queue->elementSize);
if(queue->head == queue->tail)
queue->tail = NULL;
queue->head = node->next;
if(queue->freeFn) {
queue->freeFn(node->data);
}
free(node->data);
free(node);
pthread_mutex_unlock(queue->queueLock);
}
int queue_take(queue_t *queue, void *elementBuffer) {
assert(queue != NULL);
assert(elementBuffer != NULL);
int result = EXIT_SUCCESS;
sem_wait(queue->logicalLength);
queue_removeNode(queue, elementBuffer);
return result;
}
Ниже приведен код, который выявил проблему:
//fei.h
...
#include "Queue.h"
extern queue_t *commandQueue;
...
//fei.c
#include "fei.h"
#include "commandHandler.h"
#include "Queue.h"
queue_t *commandQueue;
int main (int argc, char **argv){
int debugFlag = handleOpts(argc, argv);
if(!debugFlag){
int rc = daemonize();
if(rc != 0){
exit(rc);
}
}
rc = setConfigValues();
if(rc){
exit(rc);
}
queue_t *commandQueue = calloc(1, sizeof(queue_t));
queue_initialize(commandQueue, sizeof(commandPack_t), commandFree);
if(getPortIsock() == 0){ // This is a simple config value
exit(EXIT_FAILURE);
}
signal(SIGPIPE, SIG_IGN);
pthread_t id;
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_create(&id, &attr, receiveCommands, NULL);
pthread_create(&id, &attr, processCommands, NULL);
if(!setSocketIsock()){
exit(1);
}
while(!checkIfConnectedToSct())
usleep(50000);
receiveCCSDSPackets();
exit (0);
}
// commandHandler.c
#include "Queue.h"
#include "fei.h"
#include "commandHandler.h"
queue_t *commandQueue;
void *receiveCommands(){
getNewCsockConnection();
connectedToSct = 1;
while(1){
commandPack_t cmd;
int validCommand = getCommand(CSOCKET, &cmd);
if(validCommand == RECEIVE_SUCCESS){
queue_add(commandQueue, &cmd);
} else{
usleep(5000);
}
}
return NULL;
}
void *processCommands(){
while(1){
commandPack_t cmdToProcess;
/* Blocking queue */
queue_take(commandQueue, &cmdToProcess);
switch(cmdToProcess.command){
// Command processing
}
commandFree(&cmdToProcess);
}
return NULL;
}
receiveCommands
функцией является нитью производителя и функция processCommands
является потребитель нить , Это единственные места в базе кода, которые относятся к commandQueue
. Хотя переменная, выполнение основного потока редко выходит за пределы проверки состояния setSocketIsock()
.
Любое понимание оценено.
В queue_add(), это обычно выкладывает semaphre блока после снятия блокировки, а не внутри него, в Одере, чтобы предотвратить тека нити сразу становишься бегом, попав в блокировки, а затем снова останавливаться до тех пор, пока добавочный поток не начнет отпускать блокировку. Это не вызывает вашу блокировку, хотя :( –
Спасибо. Это хороший момент. Я также знаю, что я должен проверять вывод 'sem_wait' в том случае, если он не блокирует успешно. Слишком плохо, моя проблема в том, что это блокировка TOO успешно. – nmogk