2014-10-22 4 views
7

Мне нужно распараллелить некоторые задачи в программе на C++ и совершенно новое для параллельного программирования. До сих пор я добился определенного прогресса в интернет-поиске, но сейчас я немного застрял. Я хотел бы повторно использовать некоторые потоки в цикле, но явно не знаю, как делать то, что я пытаюсь сделать.Повторное использование потока в цикле C++

Я получаю данные с двух карт ADC на компьютере (приобретается параллельно), тогда мне нужно выполнить некоторые операции с собранными данными (обрабатываются параллельно) при сборе следующей партии данных. Вот какой-то псевдокод для иллюстрации

//Acquire some data, wait for all the data to be acquired before proceeding 
std::thread acq1(AcquireData, boardHandle1, memoryAddress1a); 
std::thread acq2(AcquireData, boardHandle2, memoryAddress2a); 
acq1.join(); 
acq2.join(); 

while(user doesn't interrupt) 
{ 

//Process first batch of data while acquiring new data 
std::thread proc1(ProcessData,memoryAddress1a); 
std::thread proc2(ProcessData,memoryAddress2a); 
acq1(AcquireData, boardHandle1, memoryAddress1b); 
acq2(AcquireData, boardHandle2, memoryAddress2b); 
acq1.join(); 
acq2.join(); 
proc1.join(); 
proc2.join(); 
/*Proceed in this manner, alternating which memory address 
is written to and being processed until the user interrupts the program.*/ 
} 

Это основная его суть. Следующий запуск цикла будет записывать на «а» адреса памяти при обработке данных «b» и продолжать чередовать (я могу заставить код сделать это, просто вытащил его, чтобы предотвратить загромождение проблемы).

В любом случае проблема (как я уверен, некоторые люди уже могут сказать) заключается в том, что во второй раз, когда я пытаюсь использовать acq1 и acq2, компилятор (VS2012) говорит: «IntelliSense: вызов объекта типа класса без соответствующего оператора() или функций преобразования в тип указателя на функцию ". Точно так же, если я снова ставил std :: thread перед acq1 и acq2, он говорит: «ошибка C2374:« acq1 »: переопределение; множественная инициализация».

Итак, вопрос в том, могу ли я переназначить потоки новой задачи, когда они выполнили свою предыдущую задачу? Я всегда жду, когда предыдущее использование потока закончится, прежде чем вызывать его снова, но я не знаю, как переназначить поток, и поскольку он находится в цикле, я не могу создавать новый поток каждый раз (или если я может, это кажется расточительным и ненужным, но я мог ошибаться).

Заранее спасибо

ответ

0

Этот

std::thread acq1(...) 

является вызов конструктора. строительство нового объекта под названием acq1

Это

acq1(...) 

является применение оператора() на существующем объекте aqc1. Если такой оператор не определен для std :: thread, компилятор жалуется.

Насколько я знаю, вы не можете повторно использовать std :: threads. Вы строите и запускаете их. Присоединитесь к ним и выбросьте их,

+0

Спасибо Oncaphillis, между вашим ответом и luk32, я думаю, что понимаю, что произошло с моей синтаксической ошибкой. – notaCSmajor

0

Ну, это зависит от того, считаете ли вы пересадку переназначения или нет. Вы можете перемещать поток, но не копировать его.

Ниже код создаст новую пару нитей на каждой итерации и переместит их вместо старых потоков. Я предполагаю, что это должно работать, потому что новые объекты thread будут временными.

while(user doesn't interrupt) 
{ 
//Process first batch of data while acquiring new data 
std::thread proc1(ProcessData,memoryAddress1a); 
std::thread proc2(ProcessData,memoryAddress2a); 
acq1 = std::thread(AcquireData, boardHandle1, memoryAddress1b); 
acq2 = std::thread(AcquireData, boardHandle2, memoryAddress2b); 
acq1.join(); 
acq2.join(); 
proc1.join(); 
proc2.join(); 
/*Proceed in this manner, alternating which memory address 
is written to and being processed until the user interrupts the program.*/ 
} 

Что происходит есть объект на самом деле не заканчивается, это время жизни в конце итерации, поскольку она объявлена ​​во внешней области видимости, в отношении цикла. Но каждый раз создается новый объект, и происходит move. Я не вижу, что можно пощадить (я мог бы быть глупым), поэтому я предполагаю, что это точно так же, как объявление acq s внутри цикла и просто повторное использование символа. В целом ... да, это о том, как вы классифицируете создание временного и move.

Кроме того, это явно запускает новый поток в каждом цикле (конечно, заканчивая ранее назначенный поток), он не заставляет поток ждать новых данных и магически подавать их в обрабатывающий канал. Вам нужно будет реализовать его по-другому. Например: пул рабочих потоков и связь между очередями.

Литература: operator=, (ctor).

Я думаю, что ошибки, которые вы получаете, не требуют пояснений, поэтому я пропущу их объяснение.

+0

Спасибо, я попробовал это и, похоже, работает по мере необходимости. Время покажет, есть ли с ним какие-то проблемы, но на данный момент это значительно помогает! – notaCSmajor

9

Класс std::thread предназначен для выполнения только одной задачи (той, которую вы даете ей в конструкторе), а затем заканчивается. Если вы хотите сделать больше работы, вам понадобится новый поток. Начиная с C++ 11, это все, что у нас есть. Пулы потоков не вошли в стандарт. (Я не уверен, что C++ 14 может сказать о них.)

К счастью, вы можете легко реализовать требуемую логику самостоятельно. Вот крупномасштабная картина:

  • Начало п рабочие потоки, которые все делают следующее:
    • Repeat пока есть больше работы, чтобы сделать:
      • Grab следующая задача т (возможно, до тех пор, пока вы не будете готовы).
      • t.
  • Keep вставив новые задачи в очереди на обработку.
  • Сообщите рабочим рабочим, что больше нечего делать.
  • Ждите окончания рабочих потоков.

Самая сложная часть здесь (которая по-прежнему довольно проста) правильно проектирует рабочую очередь. Обычно для этого будет синхронизированный связанный список (из STL). Синхронизированный означает, что любой поток, который хочет управлять очередью, должен делать это только после того, как он приобрел std::mutex, чтобы избежать условий гонки. Если рабочий поток находит список пустым, он должен ждать, пока не будет некоторая работа. Для этого вы можете использовать std::condition_variable. Каждый раз, когда новая задача вставлена ​​в очередь, вставляющий поток уведомляет поток, который ждет переменную условия и поэтому прекратит блокировку и в конечном итоге начнет обработку новой задачи.

Вторая не так-тривиальная часть - это сигнал рабочим потокам, что больше не нужно делать. Ясно, что вы можете установить некоторый глобальный флаг, но если рабочий заблокирован в очереди, он не будет реализовываться в ближайшее время. Одно из решений может состоять из notify_all() потоков и проверять флаг каждый раз, когда они уведомляются. Другой вариант - вставить в очередь несколько «токсичных» предметов. Если рабочий столкнулся с этим элементом, он завершает работу.

Представление очереди задач прямолинейно, используя ваши самоопределяемые объекты task или просто лямбды.

Все перечисленное относится к функциям C++ 11. Если вы придерживаетесь более ранней версии, вам нужно обратиться к сторонним библиотекам, которые обеспечивают многопоточность для вашей конкретной платформы.

Хотя все это не наука о ракетах, в первый раз все еще легко ошибиться. И, к сожалению, ошибки, связанные с параллелизмом, являются одними из самых сложных для отладки. Начиная с нескольких часов чтения через соответствующие разделы хорошей книги или работы через учебник можно быстро окупиться.

+0

Спасибо за этот ответ, очень тщательно и хорошо написано. Я собираюсь сохранить это как план B, если ответ luk32 ниже не подходит для меня. Ваш, вероятно, является «правильным» способом заниматься вещами. – notaCSmajor

15

Самый простой способ - использовать ожидающую очередь std::function объектов. Как это:

#include <iostream> 
#include <thread> 
#include <mutex> 
#include <condition_variable> 
#include <queue> 
#include <functional> 
#include <chrono> 


class ThreadPool 
{ 
    public: 

    ThreadPool (int threads) : shutdown_ (false) 
    { 
     // Create the specified number of threads 
     threads_.reserve (threads); 
     for (int i = 0; i < threads; ++i) 
      threads_.emplace_back (std::bind (&ThreadPool::threadEntry, this, i)); 
    } 

    ~ThreadPool() 
    { 
     { 
      // Unblock any threads and tell them to stop 
      std::unique_lock <std::mutex> l (lock_); 

      shutdown_ = true; 
      condVar_.notify_all(); 
     } 

     // Wait for all threads to stop 
     std::cerr << "Joining threads" << std::endl; 
     for (auto& thread : threads_) 
      thread.join(); 
    } 

    void doJob (std::function <void (void)> func) 
    { 
     // Place a job on the queu and unblock a thread 
     std::unique_lock <std::mutex> l (lock_); 

     jobs_.emplace (std::move (func)); 
     condVar_.notify_one(); 
    } 

    protected: 

    void threadEntry (int i) 
    { 
     std::function <void (void)> job; 

     while (1) 
     { 
      { 
       std::unique_lock <std::mutex> l (lock_); 

       while (! shutdown_ && jobs_.empty()) 
        condVar_.wait (l); 

       if (jobs_.empty()) 
       { 
        // No jobs to do and we are shutting down 
        std::cerr << "Thread " << i << " terminates" << std::endl; 
        return; 
       } 

       std::cerr << "Thread " << i << " does a job" << std::endl; 
       job = std::move (jobs_.front()); 
       jobs_.pop(); 
      } 

      // Do the job without holding any locks 
      job(); 
     } 

    } 

    std::mutex lock_; 
    std::condition_variable condVar_; 
    bool shutdown_; 
    std::queue <std::function <void (void)>> jobs_; 
    std::vector <std::thread> threads_; 
}; 

void silly (int n) 
{ 
    // A silly job for demonstration purposes 
    std::cerr << "Sleeping for " << n << " seconds" << std::endl; 
    std::this_thread::sleep_for (std::chrono::seconds (n)); 
} 

int main() 
{ 
    // Create two threads 
    ThreadPool p (2); 

    // Assign them 4 jobs 
    p.doJob (std::bind (silly, 1)); 
    p.doJob (std::bind (silly, 2)); 
    p.doJob (std::bind (silly, 3)); 
    p.doJob (std::bind (silly, 4)); 
} 
+0

Приятное решение KISS без дополнительных отделений. – Surt

0

Я думаю, что вам нужно гораздо более простой ответ для запуска набора нитей больше, чем когда-то, это самое лучшее решение:

do{ 

    std::vector<std::thread> thread_vector; 

    for (int i=0;i<nworkers;i++) 
    { 
     thread_vector.push_back(std::thread(yourFunction,Parameter1,Parameter2, ...)); 
    } 

    for(std::thread& it: thread_vector) 
    { 
     it.join(); 
    } 
    q++; 
} while(q<NTIMES); 
0

Вы также можете создать свой собственный класс Thread и назовите его метод прогона следующим образом:

class MyThread 
{ 
public: 
void run(std::function<void()> func) { 
    thread_ = std::thread(func); 
} 
void join() { 
    if(thread_.joinable()) 
     thread_.join(); 
} 
private: 
    std::thread thread_; 
}; 

// Application code... 
MyThread myThread; 
myThread.run(AcquireData);