Вот базовая реализация того, что объяснил Сэм Варшавчик.
Live demo
Причина, почему я добавил local_queue
, чтобы убедиться, что наша m_Mutex
разблокирован сразу. Если вы удалите его, поток, который вызывает push_task
, может блокировать.
Деструктор вызывает stop()
, который устанавливает m_Running
в false
, уведомляет об этом ничью и ждет завершения обработки всех оставшихся задач.
Если рабочий класс умирает, нить тоже умирает, что хорошо.
Мой пример создает только 3 темы и 5 заданий на поток for (int i = 0; i < 5; i++)
, в основном, чтобы убедиться, что все выходные показано на ideone, но я проверил это с 10 нитями и 5000 задач на поток, и он побежал отлично.
Функция do_work
имеет две линии, которые можно раскомментировать, если вы хотите, чтобы выходной поток был правильно синхронизирован. Этот класс поддерживает многопоточность.
Вы можете stop()
и повторно start()
нить столько раз, сколько вам нравится
class Worker
{
public:
Worker(bool start) : m_Running(start) { if (start) private_start(); }
Worker() : m_Running(false) { }
~Worker() { stop(); }
template<typename... Args>
void push_task(Args&&... args)
{
{
std::lock_guard<std::mutex> lk(m_Mutex);
m_Queue.push_back(std::bind(std::forward<Args>(args)...));
}
m_Condition.notify_all();
}
void start()
{
{
std::lock_guard<std::mutex> lk(m_Mutex);
if (m_Running == true) return;
m_Running = true;
}
private_start();
}
void stop()
{
{
std::lock_guard<std::mutex> lk(m_Mutex);
if (m_Running == false) return;
m_Running = false;
}
m_Condition.notify_all();
m_Thread.join();
}
private:
void private_start()
{
m_Thread = std::thread([this]
{
for (;;)
{
decltype(m_Queue) local_queue;
{
std::unique_lock<std::mutex> lk(m_Mutex);
m_Condition.wait(lk, [&] { return !m_Queue.empty() + !m_Running; });
if (!m_Running)
{
for (auto& func : m_Queue)
func();
m_Queue.clear();
return;
}
std::swap(m_Queue, local_queue);
}
for (auto& func : local_queue)
func();
}
});
}
private:
std::condition_variable m_Condition;
std::list<std::function<void()>> m_Queue;
std::mutex m_Mutex;
std::thread m_Thread;
bool m_Running = false;
};
void do_work(unsigned id)
{
//static std::mutex cout_mutex;
//std::lock_guard<std::mutex> lk(cout_mutex);
std::cout << id << std::endl;
}
int main()
{
{
Worker workers[3];
int counter = 0;
for (auto& worker : workers)
worker.start();
for (auto& worker : workers)
{
for (int i = 0; i < 5; i++)
worker.push_task(do_work, ++counter + i);
}
}
std::cout << "finish" << std::endl;
getchar();
return 0;
}
Lookup на реализации пула потоков. Вам понадобится еще один уровень косвенности передачи рабочей нагрузки. –
Вам нужно будет иметь возможность передать работу в ваши потоки (через очередь или другие средства). Это значительно отличается от того, что вы показали. Это было бы хорошим прецедентом для проактора, например 'boost :: asio' – Chad