Не уверен, что я получил идею на 100% правильно, но почему бы не передать массив рабочих потоков в суперпоток, сохранить индекс, который обозначает индекс текущего активного потока, подключать сигналы только к этому, отправлять сигналы при необходимости, ждать завершения, отключать сигналы, продвигать индекс и повторять? Если отправка сериализованных сигналов в потоки - это то, что вы действительно хотите, чтобы это могло сработать.
EDIT
Хорошо я фактически вытащил себя togetger сделать на основе образца Qt, который реализует необходимый рабочий процесс и поставить его на github.
#pragma once
#include <QThread>
#include <QApplication>
#include <QMetaType>
#include <QTimer>
#include <vector>
#include <memory>
#include <cstdio>
#include <algorithm>
struct Work
{
int m_work;
};
struct Result
{
int m_result;
int m_workerIndex;
};
Q_DECLARE_METATYPE(Work);
Q_DECLARE_METATYPE(Result);
class Worker : public QThread
{
Q_OBJECT
public:
Worker(int workerIndex) : m_workerIndex(workerIndex)
{
moveToThread(this);
connect(this, SIGNAL(WorkReceived(Work)), SLOT(DoWork(Work)));
printf("[%d]: worker %d initialized\n", reinterpret_cast<int>(currentThreadId()), workerIndex);
}
void DispatchWork(Work work)
{
emit WorkReceived(work);
}
public slots:
void DoWork(Work work)
{
printf("[%d]: worker %d received work %d\n", reinterpret_cast<int>(currentThreadId()), m_workerIndex, work.m_work);
msleep(100);
Result result = { work.m_work * 2, m_workerIndex };
emit WorkDone(result);
}
signals:
void WorkReceived(Work work);
void WorkDone(Result result);
private:
int m_workerIndex;
};
class Master : public QObject
{
Q_OBJECT
public:
Master(int workerCount) : m_activeWorker(0), m_workerCount(workerCount)
{
printf("[%d]: creating master thread\n", reinterpret_cast<int>(QThread::currentThreadId()));
}
~Master()
{
std::for_each(m_workers.begin(), m_workers.end(), [](std::unique_ptr<Worker>& worker)
{
worker->quit();
worker->wait();
});
}
public slots:
void Initialize()
{
printf("[%d]: initializing master thread\n", reinterpret_cast<int>(QThread::currentThreadId()));
for (int workerIndex = 0; workerIndex < m_workerCount; ++workerIndex)
{
auto worker = new Worker(workerIndex);
m_workers.push_back(std::move(std::unique_ptr<Worker>(worker)));
connect(worker, SIGNAL(WorkDone(Result)), SLOT(WorkDone(Result)));
worker->start();
}
m_timer = new QTimer();
m_timer->setInterval(500);
connect(m_timer, SIGNAL(timeout()), SLOT(GenerateWork()));
m_timer->start();
}
void GenerateWork()
{
Work work = { m_activeWorker };
printf("[%d]: dispatching work %d to worker %d\n", reinterpret_cast<int>(QThread::currentThreadId()), work.m_work, m_activeWorker);
m_workers[m_activeWorker]->DispatchWork(work);
m_activeWorker = ++m_activeWorker % m_workers.size();
}
void WorkDone(Result result)
{
printf("[%d]: received result %d from worker %d\n", reinterpret_cast<int>(QThread::currentThreadId()), result.m_result, result.m_workerIndex);
}
void Terminate()
{
m_timer->stop();
delete m_timer;
}
private:
int m_workerCount;
std::vector<std::unique_ptr<Worker>> m_workers;
int m_activeWorker;
QTimer* m_timer;
};
QtThreadExample.cpp:
#include "QtThreadExample.hpp"
#include <QTimer>
int main(int argc, char** argv)
{
qRegisterMetaType<Work>("Work");
qRegisterMetaType<Result>("Result");
QApplication application(argc, argv);
QThread masterThread;
Master master(5);
master.moveToThread(&masterThread);
master.connect(&masterThread, SIGNAL(started()), SLOT(Initialize()));
master.connect(&masterThread, SIGNAL(terminated()), SLOT(Terminate()));
masterThread.start();
// Set a timer to terminate the program after 10 seconds
QTimer::singleShot(10 * 1000, &application, SLOT(quit()));
application.exec();
masterThread.quit();
masterThread.wait();
printf("[%d]: master thread has finished\n", reinterpret_cast<int>(QThread::currentThreadId()));
return 0;
}
В целом решение фактически не испускает сигнал от самого главного потока, но излучает сигнал от рабочего потока - таким образом, вы получите уникальные сигнал для каждого потока и может обрабатывать работу асинхронно в цикле событий и затем возвращать сигнал о том, что поток выполнен. Образец может и должен быть реорганизован в соответствии с вашими потребностями, но в целом он демонстрирует попытку производителя/потребителя в Qt, используя идею с индексами и сигнальными потоками один за другим. Я использую общий таймер для генерации работы в главном потоке (Master::m_timer
). Думаю, в вашем случае вы будете генерировать работу с сигналом, поступающим из сокета, файла или чего-то еще. Затем я вызываю метод в потоке активного работника, и этот метод излучает сигнал в цикле событий рабочего потока, чтобы начать выполнение работы, а затем испустить сигнал о завершении. Это общее описание, посмотрите образец, попробуйте, и если у вас есть какие-либо последующие вопросы, дайте мне знать.
Я думаю, это работает довольно хорошо, если вы используете объекты Qt, но в традиционном смысле шаблонов потребительского/продюсера материал сигнала/слота фактически делает жизнь немного сложнее, стандартный конвейер C++ 11 с std::condition_variable
и вызов главного потока condition_variable::notify_one(), а рабочие потоки, просто ожидающие переменную условия, будут проще, однако Qt имеет хорошие обертки для всех материалов ввода-вывода. Поэтому просто попробуйте это и решите.
Ниже приведен пример вывода примера, я предполагаю, что протоколирование нить указывает на то, что требуется эффект достигается:
еще одно замечание, поскольку QApplication
запускает цикл событий себя, если вы не имеете GUI, вы можете позволить всем вашим объектам ввода/вывода и мастер-классу жить в основном потоке и издавать сигналы оттуда, тем самым устраняя необходимость в отдельном главном потоке. Конечно, если у вас есть графический интерфейс, вы не захотите обременять его этим материалом.
Sry, но я думаю, что это путь к широкому для SO. – Bowdzone
Sry, отредактированный, надеясь, что это будет немного более конкретным. – yonobi