2014-10-07 4 views
2

Я пытался реализовать простой барьер в моем коде, который выглядит следующим образом:Каков правильный способ реализации барьера потока и сброс барьера в C?

void waitOnBarrier(int* barrier, int numberOfThreads) { 
    atomicIncrement(barrier); // atomic increment implemented in assembly 
    while(*barrier < numberOfThreads); 
} 

И тогда есть использование барьера в коде:

int g_barrier = 0; // a global variable 

waitOnBarrier(&g_barrier, someKnownNumberOfThreads); 

До сих пор так хорошо, но там, где должен Я возвращаю переменную g_barrier обратно к нулю? Если я пишу что-то вроде

g_barrier = 0; 

прямо после waitOnBarrier вызова, я буду иметь проблемы, если один из потоков будет выпущен быстрее, чем другие от барьера и свести на нет g_barrier в то время как все другие потоки все еще выполняя петлевые инструкции, поэтому в конечном итоге они будут застрять на барьере навсегда.

Объяснение: waitOnBarrier будет компилировать в нечто вроде этого (псевдокод):

1: mov rax, numberOfThreads 
2: mov rbx, [barrier] 
3: cmp rax, rbx 
4: jmp(if smaller) to 2 

Так что, если у нас есть 2 темы с синхронизацией на барьере, и thread_1 медлительность где-то в инструкции 3 или 4 , а более быстрый thread_2 достигает барьера, передает его и продолжается до g_barrier поток аннулирования. Это означает, что после thread_1 достигнет инструкции 2, он увидит нулевое значение на [барьер] и навсегда застрянет на барьере!

Вопрос в том, как я должен аннулировать g_barrier, какое место для него в коде «достаточно далеко», что я могу быть уверен, что к тому времени все потоки покинули барьер? Или есть более правильный способ реализации барьера?

+0

Есть ли причина, по которой вы не хотите использовать библиотеку синхронизации потоков, поставляемую с вашей операционной системой? – ArjunShankar

+0

Да, мой код работает в определенной среде, а не в обычной ОС. – user1483597

+0

И есть ли какая-то конкретная причина для внедрения этой блокировки? – ArjunShankar

ответ

2

Барьеры на самом деле довольно сложно реализовать, основная причина заключается в том, что новые официанты могут начать прибывать до того, как все старые официанты имеют шанс выполнить, что исключает любую простую реализацию на основе счета. Мое предпочтительное решение состоит в том, чтобы сам барьерный объект просто указывал на «текущий барьерный экземпляр», который существует в стеке первого потока, поступающего на барьер, и который также будет последним нитью (поскольку он не может уйти, пока другой потоки все еще ссылаются на его стек).Очень хороший пример реализации с точки зрения PTHREAD примитивов (которая может быть адаптирована к C11 запирающих примитивов или все, что вы должны работать с) включается в ответ Майкл Барра в моем прошлом вопрос по теме:

https://stackoverflow.com/a/5902671/379897

Да, это похоже на большую работу, но написать барьерную реализацию, которая действительно удовлетворяет контракту барьера, нетривиальна.

1

Не перезагружайте barrier переменные в ноль.

Когда какой-либо из нити вот-вот выйдет, атомно уменьшите переменную barrier на единицу.

Ваш барьер выглядит так, что вы не хотите, чтобы количество рабочих нитей, порожденных, упало ниже numberOfThreads.

0

Я столкнулся с этим вопросом, пытаясь сделать что-то подобное, поэтому я решил поделиться своим решением, если кто-то найдет его полезным. Он реализован в чистом C++ 11 (к сожалению, не C11, поскольку многопоточная часть стандарта не поддерживается в gcc и msvc).

В основном, вы поддерживаете два счетчика, использование которых чередуется. Ниже приведен пример внедрения и использования:

#include <cstdio> 
    #include <thread> 
    #include <condition_variable> 

    // A barrier class; The barrier is initialized with the number of expected threads to synchronize 
    class barrier_t{ 
     const size_t count; 
     size_t counter[2], * currCounter; 
     std::mutex mutex; 
     std::condition_variable cv; 

    public: 
     barrier_t(size_t count) : count(count), currCounter(&counter[0]) { 
      counter[0] = count; 
      counter[1] = 0; 
     } 
     void wait(){ 
      std::unique_lock<std::mutex> lock(mutex); 
      if (!--*currCounter){ 
       currCounter += currCounter == counter ? 1 : -1; 
       *currCounter = count; 
       cv.notify_all(); 
      } 
      else { 
       size_t * currCounter_local = currCounter; 
       cv.wait(lock, [currCounter_local]{return *currCounter_local == 0; }); 
      } 
     } 
    }; 

    void testBarrier(size_t iters, size_t threadIdx, barrier_t *B){ 
     for(size_t i = 0; i < iters; i++){ 
      printf("Hello from thread %i for the %ith time!\n", threadIdx, i); 
      B->wait(); 
     } 
    } 

    int main(void){ 
     const size_t threadCnt = 4, iters = 8; 
     barrier_t B(threadCnt); 
     std::thread t[threadCnt]; 
     for(size_t i = 0; i < threadCnt; i++) t[i] = std::thread(testBarrier, iters, i, &B); 
     for(size_t i = 0; i < threadCnt; i++) t[i].join(); 
     return 0; 
    } 

 Смежные вопросы

  • Нет связанных вопросов^_^