Ваша текущая программа основана на простой дизайн отправки: начальная нить создает рабочие потоки, назначая каждому из них задачу выполнить. Ваш вопрос в том, как вы делаете эту работу для любого количества задач, любого количества рабочих потоков. Ответ в том, что вы этого не делаете: ваш выбранный дизайн делает такую модификацию практически невозможной.
Даже если бы я ответил на ваши заявленные вопросы, это не повлияло бы на поведение программы, как вам хотелось бы. Это может сработать после моды, но это будет похоже на велосипед с квадратными колесами: не очень практичный, не прочный - даже не веселый после того, как вы перестанете смеяться над тем, как это выглядит глупо.
Решение, как я писал в комментарии к исходному вопросу, заключается в изменении базового дизайна: от простой отправки до подхода thread pool.
Реализация пула потоков требует двух вещей: во-первых, это изменение вашей точки зрения от начала потока и выполнение задачи, для каждого потока в «пуле», захвата выполняемой задачи и возврата в «пул» «после того, как они выполнили это. Понимание этого - трудная часть. Вторая часть, реализующая способ для каждого потока, чтобы захватить новую задачу, проста: это обычно сосредотачивается вокруг структуры данных, защищенной некоторыми замками. Точная структура данных зависит от того, что должна делать фактическая работа.
Предположим, вы хотели распараллелить вычисления Mandelbrot set (точнее, время выхода или количество итераций, необходимых до того, как точка может быть выведена за пределы установленного значения, страница Wikipedia содержит псевдокод именно для этого) , Это одна из «смущающих параллелей» проблем; те, где суб-проблемы (здесь каждая точка) могут быть решены без каких-либо зависимостей.
Вот как я мог бы сделать ядро пула потоков в этом случае. Во-первых, для каждой точки необходимо записать время выхода или счетчик итераций. Предположим, для этого мы используем unsigned int
. Нам также нужно количество точек (это 2D-массив), способ вычисления комплексного числа, соответствующего каждой точке, а также некоторый способ узнать, какие точки были либо вычислены, либо вычислены. Плюс взаимоисключающая блокировка, так что только один поток сразу изменит структуру данных.Итак:
typedef struct {
int x_size, y_size;
size_t stride;
double r_0, i_0;
double r_dx, i_dx;
double r_dy, i_dy;
unsigned int *iterations;
sem_t done;
pthread_mutex_t mutex;
int x, y;
} fractal_work;
Когда экземпляр fractal_work
построен, x_size
и y_size
являются количество столбцов и строк в iterations
карте. Количество итераций (или время выхода) для точки x
, y
хранится в iterations[x+y*stride]
. Реальная часть комплексной координаты для этой точки - r_0 + x*r_dx + y*r_dy
, а мнимая часть - i_0 + x*i_dx + y*i_dy
(что позволяет свободно масштабировать и вращать фрактал).
Когда поток захватывает следующую доступную точку, он первым блокирует mutex
и копирует x
и y
значения (для себя работать на). Затем увеличивается x
. Если x >= x_size
, он сбрасывает x
в ноль и увеличивает y
. Наконец, он разблокирует mutex
и вычисляет время эвакуации для этой точки.
Однако, если x == 0 && y >= y_size
, сообщения о потоках на семафоре done
и выходы, позволяя начальному потоку знать, что фрактал завершен. (Начальная нить просто нужно вызвать sem_wait()
один раз для каждого потока она создана.)
Нить функция работник затем что-то вроде следующего:
void *fractal_worker(void *data)
{
fractal_work *const work = (fractal_work *)data;
int x, y;
while (1) {
pthread_mutex_lock(&(work->mutex));
/* No more work to do? */
if (work->x == 0 && work->y >= work->y_size) {
sem_post(&(work->done));
pthread_mutex_unlock(&(work->mutex));
return NULL;
}
/* Grab this task (point), advance to next. */
x = work->x;
y = work->y;
if (++(work->x) >= work->x_size) {
work->x = 0;
++(work->y);
}
pthread_mutex_unlock(&(work->mutex));
/* z.r = work->r_0 + (double)x * work->r_dx + (double)y * work->r_dy;
z.i = work->i_0 + (double)x * work->i_dx + (double)y * work->i_dy;
TODO: implement the fractal iteration,
and count the iterations (say, n)
save the escape time (number of iterations)
in the work->iterations array; e.g.
work->iterations[(size_t)x + work->stride*(size_t)y] = n;
*/
}
}
Программа сначала создает структуру данных fractal_work
для работника потоки для работы, инициализирует его, затем создает некоторое количество потоков, давая каждой нити адрес этой структуры fractal_work
. Затем он может также позвонить в fractal_worker()
, чтобы «присоединиться к пулу потоков». (Этот пул автоматически «водостоки», т.е. потоки будут возвращаться/выход, когда все точки фрактала сделаны.)
Наконец, основной поток вызывает sem_wait()
на done
семафора, столько раз, сколько он создал рабочие потоки, чтобы вся работа была выполнена.
Точные поля в структуре fractal_work
не имеют значения. Тем не менее, он находится в самом ядре пула потоков. Как правило, существует хотя бы один мьютекс или rwlock, защищающий рабочие детали, так что каждый рабочий поток получает уникальные рабочие данные, а также некоторую флаговую или условную переменную или семафор, чтобы исходный поток знал, что задача завершена.
На многопоточном сервере обычно имеется только один экземпляр структуры (или переменных), описывающей рабочую очередь. Он может даже содержать такие вещи, как минимальное и максимальное количество потоков, позволяя рабочим потокам контролировать собственный номер, чтобы динамически реагировать на объем доступной работы. Это звучит магически, но на самом деле просто реализовать: когда поток завершил свою работу или проснулся в пуле без работы и удерживает мьютекс, он сначала анализирует, сколько заданий в очереди есть, и что такое текущий число рабочих потоков. Если количество потоков меньше, чем минимальное, и никаких действий не требуется, поток уменьшает количество потоков и завершает работу. Если количество потоков меньше, чем число потоков, и есть много работы, поток сначала создает новый поток, а затем захватывает следующую задачу для работы. (Да, любой поток может создавать новые потоки в процессе. Все они тоже на равных основаниях.)
Много кода в практическом многопоточном приложении, использующем один или несколько пулов потоков для выполнения работы, бухгалтерского учета. Поток пула потоков очень концентрируется на данных, и вычисления должны выполняться над данными.Я уверен, что там есть куда лучшие примеры пулов потоков; сложная часть состоит в том, чтобы придумать хорошую задачу для приложения, поскольку структуры данных так зависят от задачи, и многие вычисления настолько просты, что их распараллеливание не имеет смысла (поскольку создание новых потоков имеет небольшую вычислительную стоимость, было бы глупо тратить время на создание потоков, когда один поток выполняет ту же работу в одно и то же время или меньше).
С другой стороны, многие задачи, которые выгодны для параллелизации, требуют обмена информацией между работниками, и для этого требуется много размышлений для правильной реализации. (Например, хотя решения для параллелизации симуляций молекулярной динамики эффективно, большинство симуляторов все еще вычисляют и обмениваются данными на отдельных этапах, а не в одно и то же время. Это просто трудно сделать правильно.)
Все это означает, что вы не можете ожидать, что сможете написать код, если не понимаете концепцию. Действительно, истинное понимание понятий - это трудная задача: писать код сравнительно легко.
Даже в приведенном выше примере есть определенные моменты отключения: Есть ли порядок отправки семафора и отпускания мьютекса? (Ну, это зависит от того, что поток, ожидающий завершения фрактала, - и действительно, если он еще ждет.) Если бы это была переменная условия вместо семафора, было бы важно, чтобы поток, который заинтересованный в завершении фрактала, ожидает переменную условия, иначе он будет пропускать сигнал/трансляцию. (Вот почему я использовал семафор.)
Используйте пул потоков (https://en.wikipedia.org/wiki/Thread_pool). –
То, что вы хотите, недостаточно ясно. Не могли бы вы рассказать? – shrike
@shrike Мне нужно делать бесконечное количество задач (одна задача в одном потоке), и я хотел бы знать, как ее реализовать с помощью pthreads в C. Это то, что вы хотели знать? –