2016-10-08 6 views
-2

Я хотел бы создать программу мульти-потоки в C (Linux) с:Multi-темы архитектуры программа C потоковой

  1. бесконечный цикл с бесконечным числом задач
  2. Один поток на одной задаче
  3. Ограничьте общее количество потоков, так что если, например, общее количество потоков больше MAX_THREADS_NUMBER, выполните sleep(), пока общий номер потоков не станет меньше MAX_THREADS_NUMBER, продолжите его.

Резюме: мне нужно сделать бесконечное число задач (одно задание на одну нить), и я хотел бы знать, как реализовать его с помощью Pthreads в С.

Вот мой код:

#include <stdio.h> 
#include <string.h> 
#include <pthread.h> 
#include <stdlib.h> 
#include <unistd.h> 


#define MAX_THREADS 50 


pthread_t thread[MAX_THREADS]; 
int counter; 
pthread_mutex_t lock; 

void* doSomeThing(void *arg) 
{ 
    pthread_mutex_lock(&lock); 
    counter += 1; 
    printf("Job %d started\n", counter); 
    pthread_mutex_unlock(&lock); 

    return NULL; 
} 

int main(void) 
{ 
    int i = 0; 
    int ret; 

    if (pthread_mutex_init(&lock, NULL) != 0) 
    { 
     printf("\n mutex init failed\n"); 
     return 1; 
    } 

    for (i = 0; i < MAX_THREADS; i++) { 
     ret = pthread_create(&(thread[i]), NULL, &doSomeThing, NULL); 
     if (ret != 0) 
      printf("\ncan't create thread :[%s]", strerror(ret)); 
    } 

    // Wait all threads to finish 
    for (i = 0; i < MAX_THREADS; i++) { 
     pthread_join(thread[i], NULL); 
    } 

    pthread_mutex_destroy(&lock); 

    return 0; 
} 

Как сделать этот цикл бесконечным?

for (i = 0; i < MAX_THREADS; i++) { 
    ret = pthread_create(&(thread[i]), NULL, &doSomeThing, NULL); 
    if (ret != 0) 
     printf("\ncan't create thread :[%s]", strerror(ret)); 
} 

мне нужно что-то вроде этого:

while (1) { 
    if (thread_number > MAX_THREADS_NUMBER) 
     sleep(1); 

    ret = pthread_create(...); 
    if (ret != 0) 
     printf("\ncan't create thread :[%s]", strerror(ret)); 
} 
+2

Используйте пул потоков (https://en.wikipedia.org/wiki/Thread_pool). –

+0

То, что вы хотите, недостаточно ясно. Не могли бы вы рассказать? – shrike

+0

@shrike Мне нужно делать бесконечное количество задач (одна задача в одном потоке), и я хотел бы знать, как ее реализовать с помощью pthreads в C. Это то, что вы хотели знать? –

ответ

1

Ваша текущая программа основана на простой дизайн отправки: начальная нить создает рабочие потоки, назначая каждому из них задачу выполнить. Ваш вопрос в том, как вы делаете эту работу для любого количества задач, любого количества рабочих потоков. Ответ в том, что вы этого не делаете: ваш выбранный дизайн делает такую ​​модификацию практически невозможной.

Даже если бы я ответил на ваши заявленные вопросы, это не повлияло бы на поведение программы, как вам хотелось бы. Это может сработать после моды, но это будет похоже на велосипед с квадратными колесами: не очень практичный, не прочный - даже не веселый после того, как вы перестанете смеяться над тем, как это выглядит глупо.

Решение, как я писал в комментарии к исходному вопросу, заключается в изменении базового дизайна: от простой отправки до подхода thread pool.

Реализация пула потоков требует двух вещей: во-первых, это изменение вашей точки зрения от начала потока и выполнение задачи, для каждого потока в «пуле», захвата выполняемой задачи и возврата в «пул» «после того, как они выполнили это. Понимание этого - трудная часть. Вторая часть, реализующая способ для каждого потока, чтобы захватить новую задачу, проста: это обычно сосредотачивается вокруг структуры данных, защищенной некоторыми замками. Точная структура данных зависит от того, что должна делать фактическая работа.

Предположим, вы хотели распараллелить вычисления Mandelbrot set (точнее, время выхода или количество итераций, необходимых до того, как точка может быть выведена за пределы установленного значения, страница Wikipedia содержит псевдокод именно для этого) , Это одна из «смущающих параллелей» проблем; те, где суб-проблемы (здесь каждая точка) могут быть решены без каких-либо зависимостей.

Вот как я мог бы сделать ядро ​​пула потоков в этом случае. Во-первых, для каждой точки необходимо записать время выхода или счетчик итераций. Предположим, для этого мы используем unsigned int. Нам также нужно количество точек (это 2D-массив), способ вычисления комплексного числа, соответствующего каждой точке, а также некоторый способ узнать, какие точки были либо вычислены, либо вычислены. Плюс взаимоисключающая блокировка, так что только один поток сразу изменит структуру данных.Итак:

typedef struct { 
    int    x_size, y_size; 
    size_t   stride; 
    double   r_0, i_0; 
    double   r_dx, i_dx; 
    double   r_dy, i_dy; 
    unsigned int  *iterations; 
    sem_t    done; 
    pthread_mutex_t mutex; 
    int    x, y; 
} fractal_work; 

Когда экземпляр fractal_work построен, x_size и y_size являются количество столбцов и строк в iterations карте. Количество итераций (или время выхода) для точки x, y хранится в iterations[x+y*stride]. Реальная часть комплексной координаты для этой точки - r_0 + x*r_dx + y*r_dy, а мнимая часть - i_0 + x*i_dx + y*i_dy (что позволяет свободно масштабировать и вращать фрактал).

Когда поток захватывает следующую доступную точку, он первым блокирует mutex и копирует x и y значения (для себя работать на). Затем увеличивается x. Если x >= x_size, он сбрасывает x в ноль и увеличивает y. Наконец, он разблокирует mutex и вычисляет время эвакуации для этой точки.

Однако, если x == 0 && y >= y_size, сообщения о потоках на семафоре done и выходы, позволяя начальному потоку знать, что фрактал завершен. (Начальная нить просто нужно вызвать sem_wait() один раз для каждого потока она создана.)

Нить функция работник затем что-то вроде следующего:

void *fractal_worker(void *data) 
{ 
    fractal_work *const work = (fractal_work *)data; 
    int   x, y; 

    while (1) { 

     pthread_mutex_lock(&(work->mutex)); 

     /* No more work to do? */ 
     if (work->x == 0 && work->y >= work->y_size) { 
      sem_post(&(work->done)); 
      pthread_mutex_unlock(&(work->mutex)); 
      return NULL; 
     } 

     /* Grab this task (point), advance to next. */ 
     x = work->x; 
     y = work->y; 
     if (++(work->x) >= work->x_size) { 
      work->x = 0; 
      ++(work->y); 
     } 

     pthread_mutex_unlock(&(work->mutex)); 

     /* z.r = work->r_0 + (double)x * work->r_dx + (double)y * work->r_dy; 
      z.i = work->i_0 + (double)x * work->i_dx + (double)y * work->i_dy; 

      TODO: implement the fractal iteration, 
       and count the iterations (say, n) 

       save the escape time (number of iterations) 
       in the work->iterations array; e.g. 
      work->iterations[(size_t)x + work->stride*(size_t)y] = n; 
     */ 
    } 
} 

Программа сначала создает структуру данных fractal_work для работника потоки для работы, инициализирует его, затем создает некоторое количество потоков, давая каждой нити адрес этой структуры fractal_work. Затем он может также позвонить в fractal_worker(), чтобы «присоединиться к пулу потоков». (Этот пул автоматически «водостоки», т.е. потоки будут возвращаться/выход, когда все точки фрактала сделаны.)

Наконец, основной поток вызывает sem_wait() на done семафора, столько раз, сколько он создал рабочие потоки, чтобы вся работа была выполнена.

Точные поля в структуре fractal_work не имеют значения. Тем не менее, он находится в самом ядре пула потоков. Как правило, существует хотя бы один мьютекс или rwlock, защищающий рабочие детали, так что каждый рабочий поток получает уникальные рабочие данные, а также некоторую флаговую или условную переменную или семафор, чтобы исходный поток знал, что задача завершена.

На многопоточном сервере обычно имеется только один экземпляр структуры (или переменных), описывающей рабочую очередь. Он может даже содержать такие вещи, как минимальное и максимальное количество потоков, позволяя рабочим потокам контролировать собственный номер, чтобы динамически реагировать на объем доступной работы. Это звучит магически, но на самом деле просто реализовать: когда поток завершил свою работу или проснулся в пуле без работы и удерживает мьютекс, он сначала анализирует, сколько заданий в очереди есть, и что такое текущий число рабочих потоков. Если количество потоков меньше, чем минимальное, и никаких действий не требуется, поток уменьшает количество потоков и завершает работу. Если количество потоков меньше, чем число потоков, и есть много работы, поток сначала создает новый поток, а затем захватывает следующую задачу для работы. (Да, любой поток может создавать новые потоки в процессе. Все они тоже на равных основаниях.)

Много кода в практическом многопоточном приложении, использующем один или несколько пулов потоков для выполнения работы, бухгалтерского учета. Поток пула потоков очень концентрируется на данных, и вычисления должны выполняться над данными.Я уверен, что там есть куда лучшие примеры пулов потоков; сложная часть состоит в том, чтобы придумать хорошую задачу для приложения, поскольку структуры данных так зависят от задачи, и многие вычисления настолько просты, что их распараллеливание не имеет смысла (поскольку создание новых потоков имеет небольшую вычислительную стоимость, было бы глупо тратить время на создание потоков, когда один поток выполняет ту же работу в одно и то же время или меньше).

С другой стороны, многие задачи, которые выгодны для параллелизации, требуют обмена информацией между работниками, и для этого требуется много размышлений для правильной реализации. (Например, хотя решения для параллелизации симуляций молекулярной динамики эффективно, большинство симуляторов все еще вычисляют и обмениваются данными на отдельных этапах, а не в одно и то же время. Это просто трудно сделать правильно.)

Все это означает, что вы не можете ожидать, что сможете написать код, если не понимаете концепцию. Действительно, истинное понимание понятий - это трудная задача: писать код сравнительно легко.

Даже в приведенном выше примере есть определенные моменты отключения: Есть ли порядок отправки семафора и отпускания мьютекса? (Ну, это зависит от того, что поток, ожидающий завершения фрактала, - и действительно, если он еще ждет.) Если бы это была переменная условия вместо семафора, было бы важно, чтобы поток, который заинтересованный в завершении фрактала, ожидает переменную условия, иначе он будет пропускать сигнал/трансляцию. (Вот почему я использовал семафор.)

+0

Большое спасибо за ваш ответ. Не могли бы вы разместить полный пример, пожалуйста. –

+0

@SebastianRockefeller: Зачем вам нужен полный пример? Как я уже сказал, общая концепция пула потоков - это трудная часть, и самое главное. Ищите это в своих справочных материалах, ссылках на странице Википедии или в одном из попаданий веб-поиска при поиске '' пула потоков пулов ''. –

 Смежные вопросы

  • Нет связанных вопросов^_^