2017-02-18 25 views
2

У меня есть n количество заданий, между которыми нет общего ресурса, и m тем. Я хочу эффективно делить количество заданий в потоках таким образом, чтобы не было простаивающего потока, пока все не обработано?Разделительная работа между фиксированным числом потоков с pthread

Это прототип моей программы:

class Job { 
    //constructor and other stuff 
    //... 

    public: doWork(); 
}; 

struct JobParams{ 
    int threadId; 
    Job job; 
}; 

void* doWorksOnThread(void* job) { 
    JobParams* j = // cast argument 
    cout << "Thread #" << j->threadId << " started" << endl; 
    j->job->doWork(); 
    return (void*)0; 
} 

Тогда в моем главном файле у меня есть что-то вроде:

int main() { 
    vector<Job> jobs; // lets say it has 17 jobs 
    int numThreads = 4; 

    pthread_t* threads = new pthread_t[numThreads]; 
    JobParams* jps = new JubParams[jobs.size()]; 

    for(int i = 0; i < jobs.size(); i++) { 
    jps[i]->job = jobs[i];   
    } 

    for(int i = 0; i < numThread; i++) { 
    pthread_create(&t[i], null, doWorkOnThread, &jps[0]) 
    } 

    //another for loop and call join on 4 threads... 

    return 0; 
} 

как я могу эффективно убедиться, что нет простоя нити, пока все рабочие места завершены?

+0

Почему вы используете pthreads вместо 'std :: thread'? Почему вы создаете массив с 'new' вместо использования' std :: vector'? Вы делаете код более сложным, чем необходимо. –

ответ

0

Вам нужно будет добавить цикл, чтобы идентифицировать потоки, которые были завершены, а затем запустить новые, чтобы всегда иметь до 4 потоков.

Вот очень простой способ сделать это. Использование сна, как было предложено, может быть хорошим началом и будет выполнять эту работу (даже если добавить дополнительную задержку, прежде чем вы выясните последний завершенный поток). В идеале вы должны использовать condition variable, уведомленный потоком, когда задание выполнено для пробуждения основного цикла (тогда инструкция сна будет заменена инструкцией условия ожидания).

struct JobParams{ 
    int threadId; 
    Job job; 
    std::atomic<bool> done; // flag to know when job is done, could also be an attribute of Job class! 
}; 

void* doWorksOnThread(void* job) { 
    JobParams* j = // cast argument 
    cout << "Thread #" << j->threadId << " started" << endl; 
    j->job->doWork(); 
    j->done = true; // signal job completed 
    return (void*)0; 
} 

int main() { 
    .... 

    std::map<JobParams*,pthread_t*> runningThreads; // to keep track of running jobs 

    for(int i = 0; i < jobs.size(); i++) { 
     jps[i]->job = jobs[i]; 
     jps[i]->done = false; // mark as not done yet 
    } 

    while (true) 
    { 
     vector<JobParams*> todo; 
     for(int i = 0; i < jobs.size(); i++) 
     { 
      if (!jps[i]->done) 
      { 
       if (runningThreads.find(jps[i]) == runningThreads.end()) 
        todo.push_back(&jps[i]); // job not started yet, mask as to be done 
       // else, a thread is already processing the job and did not complete it yet 
      } 
      else 
      { 
       if (runningThreads.find(jps[i]) != runningThreads.end()) 
       { 
        // thread just completed the job! 
        // let's join to wait for the thread to end cleanly 
        // I'm not familiar with pthread, hope this is correct 
        void* res; 
        pthread_join(runningThreads[jps[i]], &res); 
        runningThreads.erase(jps[i]); // not running anymore 
       } 
       // else, job was already done and thread joined from a previous iteration 
      } 
     } 

     if (todo.empty() && runningThreads.empty()) 
      break; // done all jobs 

     // some jobs remain undone 

     if (runningThreads.size() < numThreads && !todo.empty()) 
     { 
      // some new threads shall be started... 

      int newThreadsToBeCreatedCount = numThreads - runningThreads.size(); 
      // make sure you don't end up with too many threads running 
      if (todo.size() > newThreadsToBeCreatedCount) 
       todo.resize(newThreadsToBeCreatedCount); 

      for (auto jobParam : todo) 
      { 
       pthread_t* thread = runningThreads[&jobParam]; 
       pthread_create(thread, null, doWorkOnThread, &jobParam); 
      } 
     } 
     // else: you already have 4 runnign jobs 

     // sanity check that everything went as expected: 
     assert(runningThreads.size() <= numThreads); 

     msleep(100); // give a chance for some jobs to complete (100ms) 
         // adjust sleep duration if necessary 
    } 
} 

Примечание: Я не очень хорошо знаком с pthread. Надеюсь, что синтаксис верен.