2016-05-19 3 views
3

Я реализую программу quicksort с помощью метода multi-thread в C++ с задачей Портфолио.Быстрая сортировка с помощью многопоточности в C++

Метод задач портфеля заключается в поддержании очереди задач. Каждый свободный поток выбирает задачу из портфеля, выполняет его, если необходимо генерировать новые подзадачи и размещение их в портфеле

Но я не уверен, что это правильно! Мне кажется, что в одном thread алгоритм работает быстрее, чем два или четыре thread. Могу ли я каким-то образом испортить синхронизацию?

Спасибо за помощь.

Код:

#include <thread> 
#include <chrono> 
#include <mutex> 
#include <condition_variable> 
#include <iostream> 
#include <queue> 
#include <vector> 
#include <set> 
#include <ctime> 
#include <algorithm> 

using namespace std; 

//print array 
template <typename T> 
void print(const vector<T> &arr) 
{ 
    for (size_t i = 0; i < arr.size(); i++) 
     cout << arr[i] << " "; 
    cout << endl; 
} 

//queue tasks 
queue< pair<int, int> > tasks; 
//mutexs for set and queue task 
mutex q_mutex, s_mutex; 
//condition variable 
condition_variable cv; 
//set 
set<int> ss; 

//partition algorithm 
template <typename T> 
int partition(vector<T> &arr, int l, int r) 
{ 
    T tmp = arr[r]; //as pivot element 
    int i = l - 1; 

    for (int j = l; j <= r - 1; j++) 
     if (arr[j] < tmp) 
     { 
      i++; 
      swap(arr[i], arr[j]);  
     } 

    swap(arr[i + 1], arr[r]); 
    i++; 
    return i; 
} 

//quick sort 
template <typename T> 
void quick_sort(vector<T> &arr) 
{ 
    while (true) 
    { 
     unique_lock<mutex> u_lock(q_mutex); //lock mutex 

     //sort is fineshed 
     if (ss.size() == arr.size()) //u_lock.unlock() 
      return; 

     //if queue task is not empty 
     if (tasks.size() > 0) 
     { 
      //get task from queue 
      pair<int, int> cur_task = tasks.front();    
      tasks.pop(); 

      int l = cur_task.first, r = cur_task.second;   

      if (l < r) 
      { 
       int q = partition(arr, l, r); //split array 

       //Add indexes in set 
       s_mutex.lock(); 
       ss.insert(q); 
       ss.insert(l); 
       ss.insert(r); 
       s_mutex.unlock(); 

       //push new tasks for left and right part 
       tasks.push(make_pair(l, q - 1)); 
       tasks.push(make_pair(q + 1, r)); 

       //wakeup some thread which waiting 
       cv.notify_one(); 
      } 
     } 
     else 
      //if queue is empty 
      cv.wait(u_lock); 
    } 
} 

//Size array 
const int ARR_SIZE = 100000; 
//Count threads 
const int THREAD_COUNT = 8; 

thread thrs[THREAD_COUNT]; 

//generatin array 
void generate_arr(vector<int> &arr) 
{ 
    srand(time(NULL)); 

    std::generate(arr.begin(), arr.end(), [](){return rand() % 10000; }); 
} 

//check for sorting 
bool is_sorted(const vector<int> &arr) 
{ 
    for (size_t i = 0; i < arr.size() - 1; i++) 
     if (! (arr[i] <= arr[i + 1])) 
      return false; 
    return true; 
} 

int main() 
{ 
    //time 
    clock_t start, finish; 

    vector<int> arr(ARR_SIZE); 

    //generate array 
    generate_arr(arr); 

    cout << endl << "Generating finished!" << endl << endl; 

    cout << "Array before sorting" << endl << endl; 

    //Before sorting 
    print(arr); 

    cout << endl << endl; 

    cout << "Checking is_sorted finished! The result is " << (is_sorted(arr) == 0? "false": "true") << "." << endl << endl; 

    //add task 
    tasks.push(make_pair(0, arr.size() - 1)); 

    //================================================== 
    start = clock(); 

    for (int i = 0; i < THREAD_COUNT; i++) 
     thrs[i] = thread(quick_sort<int>, ref(arr)); 

    finish = clock(); 
    //================================================== 

    for (auto& th : thrs) 
     th.join(); 

    cout << "Sorting finished!" << endl << endl; 

    cout << "Array after sorting" << endl << endl; 
    //After sorting 
    print(arr); 

    cout << endl << endl; 

    cout << "Checking is_sorted finished! The result is " << (is_sorted(arr) == 0? "false": "true") << "." << endl << endl; 

    cout << "Runtime: " << (double)(finish - start)/CLOCKS_PER_SEC << endl; 

    return 0; 
} 
+0

Hm, делает эту строку 'unique_lock u_lock' конвертирует ваш параллельный код в последовательный? – fghj

+0

Тот же вопрос по ru.SO: [http://ru.stackoverflow.com/questions/525498/Быстрая-сортировка-и-многопоточность-c](http://ru.stackoverflow.com/questions/525498/%D0 % 91% D1% 8B% D1% 81% D1% 82% D1% 80% D0% B0% D1% 8F-% D1% 81% D0% BE% D1% 80% D1% 82% D0% B8% D1% 80% D0% BE% D0% B2% D0% BA% D0% B0-% D0% B8-% D0% БК% D0% BD% D0% BE% D0% B3% D0% BE% D0% BF% D0% BE% D1% 82% D0% BE% D1% 87% D0% BD% D0% BE% D1% 81% D1% 82% D1% 8C-c) –

+0

Просто хочу указать, что параллельные операции сортировки уже существуют: http : //en.cppreference.com/w/cpp/experimental/parallelism/existing#Sorting_operations –

ответ

6

Есть много других факторов на производительность, чем просто, сколько потоков вы бросаете на эту проблему. Среди них

  • Необходимо иметь фактический параллелизм, а не только несколько потоков. Как наблюдали @ Rakete1111 и @ user1034749, вы этого не делаете.

  • Стандартная quicksort имеет хорошую локальность ссылок, особенно если размеры разделов становятся маленькими, но ваша техника сильно уходит из-за того, что ответственность за данный элемент массива, скорее всего, будет заменена на другой поток при каждом разбиении.

  • Кроме того, операции с мьютексом не являются особенно дешевыми, и вы начинаете делать довольно много из них относительно количества фактической сортировки, когда разделы становятся маленькими.

  • Нет смысла использовать больше потоков, чем у вас есть физические ядра. Четыре потока, вероятно, не слишком много, но это зависит от вашего оборудования.

Вот несколько способов, вы можете улучшить свою многопоточную производительность:

  1. В методе quick_sort(), не держать мьютекс q_mutex заблокирован во время фактической сортировки, так как в настоящее время вы делаете (в unique_lock конструкторе вы используете блокировку мьютекса, и вы не разблокируете его в течение всего срока службы unique_lock).

  2. Переключение на обычную рекурсивную технику для разделов, меньших некоторого порогового размера. Вам нужно будет протестировать, чтобы найти хорошее определенное пороговое значение; возможно, его нужно перестраивать.

  3. При каждом разбиении каждый поток помещает только один из подсетей в портфолио; пусть он обрабатывает другой рекурсивно - или лучше, итеративно. Фактически, сделайте это меньшим подсегментом, который вы публикуете, так как это поставит лучшую оценку размера портфеля.

Вы также можете рассмотреть возможность увеличения количества элементов, на которых вы запускаете свой тест.100000 на самом деле не так много, и вы можете увидеть разные характеристики производительности для больших проблем. 1000000 элементов вовсе не является необоснованным для такого теста на современном оборудовании.

+1

Кроме того, его быстрый алгоритм сортировки всегда работает на 1 поток, поэтому другие потоки в основном просто ничего не делают. – Rakete1111

+0

Спасибо вам за совет! – rekrut

+0

@ Rakete1111 Итак, как изменить код, чтобы другие потоки заработали? Я не понимаю. – rekrut

1

Мне кажется, вы должны зафиксировать поведение задач портфолио в класс.

template <typename TASK, unsigned CONCURRENCY> 
class Portfolio { 
    std::array<std::thread, CONCURRENCY> workers_; 
    std::deque<TASK> tasks_; 
    std::mutex m_; 
    std::condition_variable cv_; 
    std::atomic<bool> quit_; 
    void work() { 
     while (!quit_) { 
      TASK t = get(); 
      if (quit_) break; 
      t(); 
     } 
    } 
    TASK get() { 
     std::unique_lock<std::mutex> lock(m_); 
     while (tasks_.empty()) { 
      cv_.wait(lock); 
      if (quit_) return TASK(); 
     } 
     TASK t = tasks_.front(); 
     tasks_.pop_front(); 
     if (!tasks_.empty()) cv_.notify_one(); 
     return t; 
    } 
public: 
    void put (TASK t) { 
     std::unique_lock<std::mutex> lock(m_); 
     tasks_.push_back(t); 
     cv_.notify_one(); 
    } 
    Portfolio(); 
    ~Portfolio(); 
}; 

Конструктор будет инициализировать рабочих с потоками, что каждый вызов метода work(). Деструктор установил quit_, сигнализирует все потоки и присоединяется к ним.

Тогда ваша быстрая сортировка может быть упрощена:

template <typename T, unsigned WORK_SIZE, typename PORTFOLIO> 
QuickSortTask { 
    std::reference_wrapper<PORTFOLIO> portfolio_; 
    std::reference_wrapper<std::vector<T>> arr_; 
    int start_; 
    int end_; 

    QuickSortTask (PORTFOLIO &p, std::vector<T> &a, int s, int e) 
     : portfolio_(p), arr_(a), start_(s), end_(e) 
     {} 

    void operator()() { 
     if ((end_ - start_) > WORK_SIZE) { 
      int p = partition(arr_, start_, end_); 
      portfolio_.put(QuickSortTask(portfolio_, arr_, start_, p-1)); 
      portfolio_.put(QuickSortTask(portfolio_, arr_, p+1, end_)); 
     } else { 
      regular_quick_sort(arr_, start_, end_); 
     } 
    } 
}; 

К сожалению, этот способ сформулировать параллельный быстрый вид вряд ли произведет большое ускорение. То, что вы хотите сделать, - это распараллеливать задачу разбиения, которая требует, по крайней мере, одного однопроцессорного вычислительного прохода (включая сравнение данных и свопы) до начала распараллеливания.

Возможно, было бы быстрее разделить массив на подматрицы WORK_SIZE, выполнить быстрый сортировку по каждому из них параллельно, а затем объединить результаты для создания отсортированного вектора.

+0

Спасибо вам за ответ! !! – rekrut

 Смежные вопросы

  • Нет связанных вопросов^_^