2009-05-29 5 views
8

Я хотел бы создать класс, методы которого можно вызывать из нескольких потоков. но вместо того, чтобы выполнять метод в потоке, из которого он был вызван, он должен выполнять их все в своем потоке. Никакой результат не нужно возвращать, и он не должен блокировать вызывающий поток.Event/Task Queue Multithreading C++

Первая попытка осуществления, которую я включил ниже. Публичные методы вставляют указатель на функцию и данные в очередь заданий, которую затем обрабатывает рабочий поток. Однако это не очень красивый код, и добавление новых методов громоздко.

В идеале я хотел бы использовать это как базовый класс, который я могу легко добавлять методы (с переменным числом аргументов) с минимальным удвоением и удвоением кода.

Что такое лучший способ сделать это? Существует ли какой-либо существующий код, который делает что-то подобное? Благодаря

#include <queue> 

using namespace std; 

class GThreadObject 
{ 
    class event 
    { 
     public: 
     void (GThreadObject::*funcPtr)(void *); 
     void * data; 
    }; 

public: 
    void functionOne(char * argOne, int argTwo); 

private: 
    void workerThread(); 
    queue<GThreadObject::event*> jobQueue; 
    void functionOneProxy(void * buffer); 
    void functionOneInternal(char * argOne, int argTwo); 

}; 



#include <iostream> 
#include "GThreadObject.h" 

using namespace std; 

/* On a continuous loop, reading tasks from queue 
* When a new event is received it executes the attached function pointer 
* It should block on a condition, but Thread code removed to decrease clutter 
*/ 
void GThreadObject::workerThread() 
{ 
    //New Event added, process it 
    GThreadObject::event * receivedEvent = jobQueue.front(); 

    //Execute the function pointer with the attached data 
    (*this.*receivedEvent->funcPtr)(receivedEvent->data); 
} 

/* 
* This is the public interface, Can be called from child threads 
* Instead of executing the event directly it adds it to a job queue 
* Then the workerThread picks it up and executes all tasks on the same thread 
*/ 
void GThreadObject::functionOne(char * argOne, int argTwo) 
{ 

    //Malloc an object the size of the function arguments 
    int argumentSize = sizeof(char*)+sizeof(int); 
    void * myData = malloc(argumentSize); 
    //Copy the data passed to this function into the buffer 
    memcpy(myData, &argOne, argumentSize); 

    //Create the event and push it on to the queue 
    GThreadObject::event * myEvent = new event; 
    myEvent->data = myData; 
    myEvent->funcPtr = &GThreadObject::functionOneProxy; 
    jobQueue.push(myEvent); 

    //This would be send a thread condition signal, replaced with a simple call here 
    this->workerThread(); 
} 

/* 
* This handles the actual event 
*/ 
void GThreadObject::functionOneInternal(char * argOne, int argTwo) 
{ 
    cout << "We've made it to functionTwo char*:" << argOne << " int:" << argTwo << endl; 

    //Now do the work 
} 

/* 
* This is the function I would like to remove if possible 
* Split the void * buffer into arguments for the internal Function 
*/ 
void GThreadObject::functionOneProxy(void * buffer) 
{ 
    char * cBuff = (char*)buffer; 
    functionOneInternal((char*)*((unsigned int*)cBuff), (int)*(cBuff+sizeof(char*))); 
}; 

int main() 
{ 
    GThreadObject myObj; 

    myObj.functionOne("My Message", 23); 

    return 0; 
} 

ответ

6

В библиотеке Futures пробивается Boost и стандартная библиотека C++. В ACE есть что-то подобное, но я бы не рекомендовал его кому-либо (как уже указывал @lothar, это активный объект.)

+0

Я искал boost :: фьючерсы, но поскольку он не является частью выпущенной версии boost, мне пришлось вернуться к моей доверенной ACE :-) – lothar

+0

Библиотека фьючерсов будет частью Boost 1.41. Он также доступен как часть моей реализации библиотеки потоков C++ 0x по адресу http://www.stdthread.co.uk –

+0

Спасибо, Энтони. Приятно слышать от вас :) –

2

В библиотеке POCO есть что-то по тем же линиям, что и ActiveMethod (наряду с некоторые связанные функции, например ActiveResult) в секции потоковой передачи. Исходный код легко доступен и понятен.

1

Для расширяемости и ремонтопригодности (и других возможностей) вы можете определить абстрактный класс (или интерфейс) для «задания», которое должен выполнять поток. Тогда пользователь (ы) вашего пула потоков реализует этот интерфейс и дает ссылку на объект в пуле потоков. Это очень похоже на Symbian Active Object design: все подклассы AO CActive и должны реализовать такие методы, как Run() и Cancel().

Для простоты ваш интерфейс (абстрактный класс) может быть столь же просто, как:

class IJob 
{ 
    virtual Run()=0; 
}; 

Затем пул потоков или одного потока прием запросов будет иметь что-то вроде:

class CThread 
{ 
    <...> 
public: 
    void AddJob(IJob* iTask); 
    <...> 
}; 

Естественно вы бы имеют несколько задач, которые могут иметь всевозможные дополнительные сеттеры/геттеры/атрибуты и все, что вам нужно в любой жизни. Однако единственным обязательным является реализация метода Run(), который будет выполнять длительные расчеты:

class CDumbLoop : public IJob 
{ 
public: 
    CDumbJob(int iCount) : m_Count(iCount) {}; 
    ~CDumbJob() {}; 
    void Run() 
    { 
     // Do anything you want here 
    } 
private: 
    int m_Count; 
}; 
+0

Это точный метод, который мы используем для разработки игр в системах следующего поколения в моей компании (для дополнительного бонуса производительности, посмотрите на добавление элементов в рабочую очередь с помощью методов блокировки) –

+0

Любые советы при реализации незаблокированной очереди? –

+0

Существует довольно много хороших примеров незакрепленных очередей в сети (это одна из самых простых структур без блокировки для реализации). Это тот, на который я нарезал свои зубы, когда попал на бесконтактную подножку: http://www.boyet.com/Articles/LockfreeQueue.html –

2

Вы можете решить эту проблему с помощью -library темы подталкивание в. Что-то вроде этого (половинной псевдо):


class GThreadObject 
{ 
     ... 

     public: 
       GThreadObject() 
       : _done(false) 
       , _newJob(false) 
       , _thread(boost::bind(&GThreadObject::workerThread, this)) 
       { 
       } 

       ~GThreadObject() 
       { 
         _done = true; 

         _thread.join(); 
       } 

       void functionOne(char *argOne, int argTwo) 
       { 
         ... 

         _jobQueue.push(myEvent); 

         { 
           boost::lock_guard l(_mutex); 

           _newJob = true; 
         } 

         _cond.notify_one(); 
       } 

     private: 
       void workerThread() 
       { 
         while (!_done) { 
           boost::unique_lock l(_mutex); 

           while (!_newJob) { 
             cond.wait(l); 
           } 

           Event *receivedEvent = _jobQueue.front(); 

           ... 
         } 
       } 

     private: 
       volatile bool    _done; 
       volatile bool    _newJob; 
       boost::thread    _thread; 
       boost::mutex    _mutex; 
       boost::condition_variable _cond; 
       std::queue<Event*>  _jobQueue; 
}; 

Кроме того, обратите внимание, как RAII позволяют получить этот код меньше и лучше управлять.

+1

Является std :: queue :: push threadsafe? Кажется, что functionOne lock_guard должен идти до вызова _jobQueue.push. –

1

Вот класс, который я написал для аналогичной цели (я использую его для обработки событий, но вы, конечно же, можете переименовать его в ActionQueue - и переименуйте его методы).

Вы можете использовать его как это:

С функцией вы хотите позвонить: void foo (const int x, const int y) { /*...*/ }

И: EventQueue q;

q.AddEvent (повышающее :: Bind (Foo, 10, 20));

В рабочем потоке

q.PlayOutEvents();

Примечание: для предотвращения использования циклов ЦП достаточно просто добавить код для блокировки.

код (Visual Studio 2003 с усилением 1.34.1):

#pragma once 

#include <boost/thread/recursive_mutex.hpp> 
#include <boost/function.hpp> 
#include <boost/signals.hpp> 
#include <boost/bind.hpp> 
#include <boost/foreach.hpp> 
#include <string> 
using std::string; 


// Records & plays out actions (closures) in a safe-thread manner. 

class EventQueue 
{ 
    typedef boost::function <void()> Event; 

public: 

    const bool PlayOutEvents() 
    { 
     // The copy is there to ensure there are no deadlocks. 
     const std::vector<Event> eventsCopy = PopEvents(); 

     BOOST_FOREACH (const Event& e, eventsCopy) 
     { 
      e(); 
      Sleep (0); 
     } 

     return eventsCopy.size() > 0; 
    } 

    void AddEvent (const Event& event) 
    { 
     Mutex::scoped_lock lock (myMutex); 

     myEvents.push_back (event); 
    } 

protected: 

    const std::vector<Event> PopEvents() 
    { 
     Mutex::scoped_lock lock (myMutex); 

     const std::vector<Event> eventsCopy = myEvents; 
     myEvents.clear(); 

     return eventsCopy; 
    } 

private: 

    typedef boost::recursive_mutex Mutex; 
    Mutex myMutex; 

    std::vector <Event> myEvents; 

}; 

Я надеюсь, что это помогает. :)

Martin Bilski

+0

Я настоятельно рекомендую использовать мьютексы (или любую другую форму «примитива синхронизации»), поскольку они не масштабируются вообще с несколькими процессорами (примерно через 4-8 они фактически уменьшают производительность). Посмотрите на кодирование без блокировки для действительно масштабируемых реализаций. Кроме того, если вам нужно использовать примитив синхронизации, используйте критический раздел, поскольку они быстрее, чем мьютекс (мьютекс является безопасным процессом, критический раздел является потокобезопасным, т. Е. Использует мьютекс при синхронизации между процессами, CS при синхронизации потоков в одном и том же обработать) –

0

Вы должны взглянуть на Boost, ASIO библиотеки. Он предназначен для отправки событий асинхронно. Он может быть сопряжен с библиотекой Boost Thread для создания описанной вами системы.

Вам необходимо создать экземпляр одного объекта boost::asio::io_service и запланировать серию асинхронных событий (boost::asio::io_service::post10 или boost::asio::io_service::dispatch). Затем вы вызываете функцию-член run от n тем. Объект io_service является потокобезопасным и гарантирует, что ваши асинхронные обработчики будут отправляться только в потоке, из которого вы вызвали io_service::run.

Объект boost::asio::strand также полезен для простой синхронизации потоков.

Для чего я считаю, что библиотека ASIO является очень изящным решением этой проблемы.

1

Ниже приведена реализация, которая не требует метода «functionProxy». Несмотря на то, что добавлять новые методы проще, это все еще беспорядочно.

Boost :: Bind и «Futures» действительно выглядят так, будто они убирают много этого. Наверное, я посмотрю код форсирования и посмотрю, как он работает. Спасибо за ваши предложения всем.

GThreadObject.h

#include <queue> 

using namespace std; 

class GThreadObject 
{ 

    template <int size> 
    class VariableSizeContainter 
    { 
     char data[size]; 
    }; 

    class event 
    { 
     public: 
     void (GThreadObject::*funcPtr)(void *); 
     int dataSize; 
     char * data; 
    }; 

public: 
    void functionOne(char * argOne, int argTwo); 
    void functionTwo(int argTwo, int arg2); 


private: 
    void newEvent(void (GThreadObject::*)(void*), unsigned int argStart, int argSize); 
    void workerThread(); 
    queue<GThreadObject::event*> jobQueue; 
    void functionTwoInternal(int argTwo, int arg2); 
    void functionOneInternal(char * argOne, int argTwo); 

}; 

GThreadObject.cpp

#include <iostream> 
#include "GThreadObject.h" 

using namespace std; 

/* On a continuous loop, reading tasks from queue 
* When a new event is received it executes the attached function pointer 
* Thread code removed to decrease clutter 
*/ 
void GThreadObject::workerThread() 
{ 
    //New Event added, process it 
    GThreadObject::event * receivedEvent = jobQueue.front(); 

    /* Create an object the size of the stack the function is expecting, then cast the function to accept this object as an argument. 
    * This is the bit i would like to remove 
    * Only supports 8 byte argument size e.g 2 int's OR pointer + int OR myObject8bytesSize 
    * Subsequent data sizes would need to be added with an else if 
    * */ 
    if (receivedEvent->dataSize == 8) 
    { 
     const int size = 8; 

     void (GThreadObject::*newFuncPtr)(VariableSizeContainter<size>); 
     newFuncPtr = (void (GThreadObject::*)(VariableSizeContainter<size>))receivedEvent->funcPtr; 

     //Execute the function 
     (*this.*newFuncPtr)(*((VariableSizeContainter<size>*)receivedEvent->data)); 
    } 

    //Clean up 
    free(receivedEvent->data); 
    delete receivedEvent; 

} 

void GThreadObject::newEvent(void (GThreadObject::*funcPtr)(void*), unsigned int argStart, int argSize) 
{ 

    //Malloc an object the size of the function arguments 
    void * myData = malloc(argSize); 
    //Copy the data passed to this function into the buffer 
    memcpy(myData, (char*)argStart, argSize); 

    //Create the event and push it on to the queue 
    GThreadObject::event * myEvent = new event; 
    myEvent->data = (char*)myData; 
    myEvent->dataSize = argSize; 
    myEvent->funcPtr = funcPtr; 
    jobQueue.push(myEvent); 

    //This would be send a thread condition signal, replaced with a simple call here 
    this->workerThread(); 

} 

/* 
* This is the public interface, Can be called from child threads 
* Instead of executing the event directly it adds it to a job queue 
* Then the workerThread picks it up and executes all tasks on the same thread 
*/ 
void GThreadObject::functionOne(char * argOne, int argTwo) 
{ 
    newEvent((void (GThreadObject::*)(void*))&GThreadObject::functionOneInternal, (unsigned int)&argOne, sizeof(char*)+sizeof(int)); 
} 

/* 
* This handles the actual event 
*/ 
void GThreadObject::functionOneInternal(char * argOne, int argTwo) 
{ 
    cout << "We've made it to functionOne Internal char*:" << argOne << " int:" << argTwo << endl; 

    //Now do the work 
} 

void GThreadObject::functionTwo(int argOne, int argTwo) 
{ 
    newEvent((void (GThreadObject::*)(void*))&GThreadObject::functionTwoInternal, (unsigned int)&argOne, sizeof(int)+sizeof(int)); 
} 

/* 
* This handles the actual event 
*/ 
void GThreadObject::functionTwoInternal(int argOne, int argTwo) 
{ 
    cout << "We've made it to functionTwo Internal arg1:" << argOne << " int:" << argTwo << endl; 
} 

main.cpp

#include <iostream> 
#include "GThreadObject.h" 

int main() 
{ 

    GThreadObject myObj; 

    myObj.functionOne("My Message", 23); 
    myObj.functionTwo(456, 23); 


    return 0; 
} 

Edit: Просто для полноты я сделал реализацию с усилением :: затруднительное.Основные отличия:

queue<boost::function<void()> > jobQueue; 

void GThreadObjectBoost::functionOne(char * argOne, int argTwo) 
{ 
    jobQueue.push(boost::bind(&GThreadObjectBoost::functionOneInternal, this, argOne, argTwo)); 

    workerThread(); 
} 

void GThreadObjectBoost::workerThread() 
{ 
    boost::function<void()> func = jobQueue.front(); 
    func(); 
} 

Использование реализации наддува для 10000000 Итерации functionOne() он принял ~ 19sec. Однако невозбужденная реализация заняла всего ~ 6.5 секунд. Так что примерно на 3 раза медленнее. Я предполагаю, что найти хорошую неблокирующуюся очередь будет самой большой шейкой бутылки производительности здесь. Но это все еще очень большая разница.

 Смежные вопросы

  • Нет связанных вопросов^_^