1

Предположим, что существует ряд потоков, которые состоят из циклов, запускающих экземпляры одной и той же функции, но начало каждой итерации необходимо синхронизировать (так что потоки, которые заканчиваются сначала, должны ждать, пока последний начнет новую итерацию). Как это можно сделать в C++ 11?Как синхронизировать экземпляры функции, выполняющейся на разных потоках (в C++ 11)?

...

Остальная часть поста только то, что я пытался, и как она выходит из строя.

Я использую счетчик «sync», изначально установленный на 3 (количество потоков). Каждый поток, в конце функции, вычитает 1 из этого счетчика и начинает ждать. Когда счетчик достигает 0, это означает, что 3 из них закончили один раунд, поэтому основной поток сбросит счетчик на 3 и сообщит о потоках, чтобы разбудить их.

Это работает большую часть времени, но иногда один или два потока не могут проснуться.

Так что эти глобальные переменные:

mutex syncMutex; 
condition_variable syncCV; 
int sync; 

Это в конце функции, которая выполняется в цикле в резьбе:

unique_lock<mutex> lk(syncMutex); 
cout << "Thread num: " << mFieldNum << " got sync value: " << sync; 
sync --; 
syncCV.notify_all(); 
cout << " and goes to sleep..." << endl; 
syncCV.wait(lk, []{return sync == numFields;}); 
cout << "Thread num: " << mFieldNum << " woke up" << endl; 
} 

И это работает в цикле в основной поток:

unique_lock<mutex> lk(syncMutex); 
syncCV.wait(lk, []{return sync == 0;}); 
sync = 3; 
lk.unlock(); 
cout << "Notifying all threads!" << endl; 
syncCV.notify_all(); 

Это выход он производит, когда он выходит из строя (поток # 3 не просыпаются):

Thread num: 1 got sync value: 3 and goes to sleep... 
Thread num: 2 got sync value: 2 and goes to sleep... 
Thread num: 3 got sync value: 1 and goes to sleep... 
Notifying all threads! 
Thread num: 1 woke up 
Thread num: 2 woke up 
Thread num: 3 woke up 
Thread num: 2 got sync value: 3 and goes to sleep... 
Thread num: 1 got sync value: 2 and goes to sleep... 
Thread num: 3 got sync value: 1 and goes to sleep... 
Notifying all threads! 
Thread num: 2 woke up 
Thread num: 1 woke up 
Thread num: 2 got sync value: 3 and goes to sleep... 
Thread num: 1 got sync value: 2 and goes to sleep... 

У кого-нибудь есть ключ? Спасибо за чтение.

+0

Поскольку каждый поток выполняется в цикле, после того, как поток 1 или 2 просыпался, sync-- был выполнен, прежде, чем нити 3 [] {возвращение синхронизации == numFields;} предикат был выполнен. Предикат был оценен как false, поэтому поток 3 не проснулся. –

+0

Спасибо @TonyJ Знаете ли вы, как это можно исправить? – siflun

ответ

1

С вашей синхронизацией потоков существует ряд проблем. Тони упомянул об этом в своем комментарии. У вас также есть потенциальное состояние гонки в вашем основном коде цикла, где вы вызываете lk.unlock() перед вызовом syncCV.notify_all(). (Это может позволить потоку пропустить сигнал notify_all.)

Я бы скорректировал ваш код двумя способами. Во-первых, для обращения к использованию «sync == numFields» в качестве вашего условия, которое, как заметил Тони, может оказаться недействительным после того, как другой поток выполнил синхронизацию, имеет смысл использовать в качестве вашего условия, что каждый поток выполняется только один раз в петле основного потока. В моем примере кода это достигается путем введения переменных «done [numFields]». Во-вторых, имеет смысл ввести две переменные условия: одну, чтобы сигнализировать рабочие потоки, что началась новая итерация основного цикла, а вторая - сигнал основного потока, который выполняется рабочими потоками. (Обратите внимание на то, что две переменные состояние используют один и тот же семафор.)

Вот полная программа, по образцу вашего образца кода, который включает в себя эти два подхода:

#include <iostream> 
using std::cout; 
using std::endl; 

#include <condition_variable> 
#include <mutex> 
#include <thread> 
#include <vector> 

std::mutex syncMutex; 
std::condition_variable readyCV; 
std::condition_variable doneCV; 
int sync; 
bool exitFlag; 

const int numFields = 5; 
bool done[numFields]; 

const int nloops = 10; 

void thread_func(int i) { 
    int mFieldNum = i; 
    while (true) { 
    std::unique_lock<std::mutex> lk(syncMutex); 
    readyCV.wait(lk, [mFieldNum]{return exitFlag || !done[mFieldNum-1];}); 
    if (exitFlag) break; 
    cout << "Thread num: " << mFieldNum << " woke up, got sync value: " << sync; 
    if (--sync == 0) doneCV.notify_all(); 
    done[mFieldNum-1] = true; 
    readyCV.notify_all(); 
    cout << " and goes to sleep..." << endl; 
    } 
} 

int main (int argc, char* argv[]) { 
    exitFlag = false; 
    sync = 0; 
    std::vector<std::thread> threads; 
    for (int i = 0; i < numFields; i++) { 
    done[i] = true; 
    threads.emplace_back (thread_func, i+1); 
    } 
    for (int i = 0; i <= nloops; i++) { 
    std::unique_lock<std::mutex> lk(syncMutex); 
    doneCV.wait(lk, []{return sync == 0;}); 
    cout << "main loop (lk held), i = " << i << endl; 
    sync = numFields; 
    if (i == nloops) exitFlag = true; 
    else    for (auto &b : done) b = false; 
    cout << "Notifying all threads!" << endl; 
    readyCV.notify_all(); 
    } 

    for (auto& t : threads) t.join(); 
} 

(я также добавил exitFlag и std :: thread :: join(), поэтому программа может очистить и прекратить красиво.)

Это очень похоже на классическую реализацию производителя-потребителя (один производитель, потребители numFields) с добавленным ограничением что каждый потребительский поток будет запускаться только один раз в каждом потоке потока производителей.

Вы также можете добиться практически той же программной логики более просто, если хотите отказаться от повторного использования рабочих потоков.(В вашем примере кода и моем примере выше они действуют как разновидность специализированного пула потоков.) В следующем примере новые потоки создаются для каждой итерации основного цикла. Это упрощает синхронизацию потоков и исключает переменные условия.

#include <iostream> 
using std::cout; 
using std::endl; 

#include <atomic> 
#include <mutex> 
#include <thread> 
#include <vector> 

std::mutex coutMutex; 

std::atomic<int> sync; 

const int numFields = 5; 
bool done[numFields]; 

const int nloops = 10; 

void thread_func(int i) { 
    int mFieldNum = i; 
    int mySync = sync--; 
    { 
    std::lock_guard<std::mutex> lk(coutMutex); 
    cout << "Thread num: " << mFieldNum << " woke up, got sync value: " << mySync << endl; 
    } 
} 

int main (int argc, char* argv[]) { 
    for (int i = 0; i < nloops; i++) { 
    cout << "main loop, i = " << i << endl; 
    std::vector<std::thread> threads; 
    sync = numFields; 
    for (int i = 0; i < numFields; i++) threads.emplace_back (thread_func, i+1); 
    for (auto& t : threads) t.join(); 
    } 
} 

(coutMutex является тонкость, так что выход консоли не искажаются, но это не является необходимым для логики синхронизации ядра.)

Если в вашем реальном мире прецеденту вы не» t нужно, чтобы thread_func оставался в живых от итерации до итерации (например, для сохранения некоторого состояния), и если каждый вызов thread_func достаточно работает, то затраты на создание нового потока для его запуска на самом деле не имеют значения в сравнении, тогда создание новых потоков для каждой итерации основного цикла (вместо повторного использования потоков) является простым, разумным и простым.

Happy Multi-Threaded Hacking!

К. Франк

+0

Спасибо! Это был мой первый вопрос здесь, и ваша помощь действительно хорошо говорит об этом сообществе. После многих часов разочарования я также нашел решение, подобное вашему. Я сделал переменную синхронизации вектором, чтобы каждый поток указывал, что он закончил, установив свою позицию в векторе на ноль, а затем основной поток подождал, чтобы увидеть все нули для сброса. – siflun

 Смежные вопросы

  • Нет связанных вопросов^_^