2016-03-06 7 views
2

У меня есть следующий код на C++. Код из C++ Concurrency In Action: Practical MultithreadingНазначение новой задачи потоку после завершения потока в C++

void do_work(unsigned id); 

void f() { 
    std::vector<std::thread> threads; 
    for(unsigned i = 0; i < 20; ++i) { 
     threads.push_back(std::thread(do_work, i)); 
    } 
    std::for_each(threads.begin(), threads.end(), std::mem_fn(&std::thread::join)); 
} 

Предположим, что потоки [0] завершил обработку и возвращает значение. У меня все еще есть больше файлов для обработки, и теперь я хочу назначить этот новый файл потоку, который будет завершен. Как я могу достичь такого поведения на C++? Или я должен уничтожить поток и теперь создать новый при завершении потока? Но как я могу проверить, завершен ли какой-либо из этих потоков?

+0

Lookup на реализации пула потоков. Вам понадобится еще один уровень косвенности передачи рабочей нагрузки. –

+0

Вам нужно будет иметь возможность передать работу в ваши потоки (через очередь или другие средства). Это значительно отличается от того, что вы показали. Это было бы хорошим прецедентом для проактора, например 'boost :: asio' – Chad

ответ

2

Короткий ответ на вопрос «Как я могу добиться такого поведения в C++», просто напишите код, чтобы сделать это. Первым шагом, который вы определили сами, является «как проверить, завершен ли какой-либо из этих потоков».

Существует несколько основных подходов. Но все они сводятся к одному и тому же: вместо того, чтобы каждый поток просто уходил, перед тем как заканчивается каждый поток, он уведомляет родительский процесс о том, что это сделано.

Для начала каждая нить должна знать, какой поток она есть. В вашем примере все потоки помещаются в std::vector, и они идентифицируются индексом вектора. Это не единственный способ сделать это. Есть и другие способы, чтобы пасти всех нитей, но для целей ответа это будет сделано.

Затем каждый поток должен знать, какой именно индекс, передавая индекс индекса потока в качестве параметра потока. Который ваш код уже делает. Замечательно.

Теперь, чтобы просто закрыть конец цикла: вам просто нужно создать экземпляр std::mutex с std::condition_variable, который защищая std::queue, или std::list. Или, возможно, std::set целых чисел. Вы можете решить, какой контейнер лучше для вас.

Затем перед каждым поток завершается, он:

  • блокирует семафор.

  • помещает свой индекс резьбы в контейнер.

  • сигнализирует переменную условия.

  • разблокирует мьютексы, а затем он сразу же возвращается, завершая эту тему.

Затем родительский поток, который начал все темы:

  • запирает мьютекс

  • проверки, если очередь/комплект/все пусто. Если это так, он ждет переменную условия, пока это не будет.

  • удаляет индекс потока из очереди/set/whatever и присоединяет этот поток. Этот поток только что закончился. Теперь вы знаете, какой поток был прерван, и можете делать все, что хотите, с помощью этой информации.

  • после завершения обработки или перезапуска потока, он проверяет снова, если очередь пуста.

1

Вот базовая реализация того, что объяснил Сэм Варшавчик.

Live demo

Причина, почему я добавил local_queue, чтобы убедиться, что наша m_Mutex разблокирован сразу. Если вы удалите его, поток, который вызывает push_task, может блокировать.

Деструктор вызывает stop(), который устанавливает m_Running в false, уведомляет об этом ничью и ждет завершения обработки всех оставшихся задач.

Если рабочий класс умирает, нить тоже умирает, что хорошо.

Мой пример создает только 3 темы и 5 заданий на поток for (int i = 0; i < 5; i++), в основном, чтобы убедиться, что все выходные показано на ideone, но я проверил это с 10 нитями и 5000 задач на поток, и он побежал отлично.

Функция do_work имеет две линии, которые можно раскомментировать, если вы хотите, чтобы выходной поток был правильно синхронизирован. Этот класс поддерживает многопоточность.

Вы можете stop() и повторно start() нить столько раз, сколько вам нравится

class Worker 
{ 
public: 
    Worker(bool start) : m_Running(start) { if (start) private_start(); } 
    Worker() : m_Running(false) { } 
    ~Worker() { stop(); } 

    template<typename... Args> 
    void push_task(Args&&... args) 
    { 
     { 
      std::lock_guard<std::mutex> lk(m_Mutex); 
      m_Queue.push_back(std::bind(std::forward<Args>(args)...)); 
     } 

     m_Condition.notify_all(); 
    } 

    void start() 
    { 
     { 
      std::lock_guard<std::mutex> lk(m_Mutex); 
      if (m_Running == true) return; 
      m_Running = true; 
     } 

     private_start(); 
    } 

    void stop() 
    { 
     { 
      std::lock_guard<std::mutex> lk(m_Mutex); 
      if (m_Running == false) return; 
      m_Running = false; 
     } 

     m_Condition.notify_all(); 
     m_Thread.join(); 
    } 

private: 
    void private_start() 
    { 
     m_Thread = std::thread([this] 
     { 
      for (;;) 
      { 
       decltype(m_Queue) local_queue; 
       { 
        std::unique_lock<std::mutex> lk(m_Mutex); 
        m_Condition.wait(lk, [&] { return !m_Queue.empty() + !m_Running; }); 

        if (!m_Running) 
        { 
         for (auto& func : m_Queue) 
          func(); 

         m_Queue.clear(); 
         return; 
        } 

        std::swap(m_Queue, local_queue); 
       } 

       for (auto& func : local_queue) 
        func(); 
      } 
     }); 
    } 

private: 
    std::condition_variable m_Condition; 
    std::list<std::function<void()>> m_Queue; 
    std::mutex m_Mutex; 
    std::thread m_Thread; 
    bool m_Running = false; 
}; 

void do_work(unsigned id) 
{ 
    //static std::mutex cout_mutex; 
    //std::lock_guard<std::mutex> lk(cout_mutex); 
    std::cout << id << std::endl; 
} 

int main() 
{ 
    { 
     Worker workers[3]; 
     int counter = 0; 

     for (auto& worker : workers) 
      worker.start(); 

     for (auto& worker : workers) 
     { 
      for (int i = 0; i < 5; i++) 
       worker.push_task(do_work, ++counter + i); 
     } 
    } 

    std::cout << "finish" << std::endl; 
    getchar(); 

    return 0; 
}