2015-04-30 5 views
1

Я действительно не уверен, как решить эту проблему, поэтому я объясню это в первую очередь. Мне нужно запустить несколько потоков, каждый из которых подключается к некоторому приложению через TCPSocket, до сих пор не проблема. Приложение довольно трудоемкое, поэтому я хочу, чтобы он выполнял параллельную работу по нескольким потокам и по одному потоку для связи с ним. Как только вычисление закончено, я хочу отправить результаты в другой поток, собирая результаты. Therefor я написал класс Worker:Qt многопоточность QThreads, которые поддерживают TCP-соединение и повторно используются

class Worker : public QObject { 
    Q_OBJECT 

public: 
    Worker(); 
    Worker(int port); 
    ~Worker(); 
    QTcpSocket* sock; 
    void insert(); 

public slots: 
    void connect(); 
    void process(const int &id, const QString &param, const int &arity); 

signals: 
    void ready(); 
    void finished(const int &id, const int &consistent, const QString &result); 
    void error(QString err); 
}; 

Теперь, когда superThread предполагается отрабатывать огромный файл и нужно распространять ее вокруг нитей, а затем получать и обрабатывать результаты. Мой подход до сих пор является еще superThread, что связано в основном() следующим образом:

QThread* superThread = new QThread(); 
supWorker* super = new supWorker(); 
for (int i = 0; i < nrWorkers; i++){ 
    Worker* worker = new Worker(portRange+i); 
    QThread* workerThread = new QThread(); 
    QThread::connect(workerThread, SIGNAL(started()), worker, SLOT(connect())); 
    worker->moveToThread(workerThread); 
    workerThread->start(); 
    QThread::connect(super, SIGNAL(process(int, QString, int)), worker, SLOT(process(int,QString,int))); 
    QThread::connect(worker, SIGNAL(finished(int, int, QString)), super, SLOT(handleResult(int, int, QString))); 
} 

Проблема таким образом, очевидно, что я могу отправить только сигнал для всех связанных нитей. То, что я хочу, чтобы superThread выполнял, - это отправлять аргументы только одному из потоков. Я не знаю, как я могу обрабатывать соединение, чтобы получить только один из рабочих потоков?

Любые помощь или архитектурные идеи очень ценятся, спасибо заранее.

+0

Sry, но я думаю, что это путь к широкому для SO. – Bowdzone

+0

Sry, отредактированный, надеясь, что это будет немного более конкретным. – yonobi

ответ

2

Не уверен, что я получил идею на 100% правильно, но почему бы не передать массив рабочих потоков в суперпоток, сохранить индекс, который обозначает индекс текущего активного потока, подключать сигналы только к этому, отправлять сигналы при необходимости, ждать завершения, отключать сигналы, продвигать индекс и повторять? Если отправка сериализованных сигналов в потоки - это то, что вы действительно хотите, чтобы это могло сработать.

EDIT

Хорошо я фактически вытащил себя togetger сделать на основе образца Qt, который реализует необходимый рабочий процесс и поставить его на github.

#pragma once 

#include <QThread> 
#include <QApplication> 
#include <QMetaType> 
#include <QTimer> 

#include <vector> 
#include <memory> 
#include <cstdio> 
#include <algorithm> 

struct Work 
{ 
    int m_work; 
}; 

struct Result 
{ 
    int m_result; 
    int m_workerIndex; 
}; 

Q_DECLARE_METATYPE(Work); 
Q_DECLARE_METATYPE(Result); 

class Worker : public QThread 
{ 
    Q_OBJECT 

public: 
    Worker(int workerIndex) : m_workerIndex(workerIndex) 
    { 
     moveToThread(this); 
     connect(this, SIGNAL(WorkReceived(Work)), SLOT(DoWork(Work))); 
     printf("[%d]: worker %d initialized\n", reinterpret_cast<int>(currentThreadId()), workerIndex); 
    } 

    void DispatchWork(Work work) 
    { 
     emit WorkReceived(work); 
    } 

public slots: 
    void DoWork(Work work) 
    { 
     printf("[%d]: worker %d received work %d\n", reinterpret_cast<int>(currentThreadId()), m_workerIndex, work.m_work); 
     msleep(100); 
     Result result = { work.m_work * 2, m_workerIndex }; 
     emit WorkDone(result); 
    } 

signals: 
    void WorkReceived(Work work); 
    void WorkDone(Result result); 

private: 
    int m_workerIndex; 
}; 

class Master : public QObject 
{ 
    Q_OBJECT 

public: 
    Master(int workerCount) : m_activeWorker(0), m_workerCount(workerCount) 
    { 
     printf("[%d]: creating master thread\n", reinterpret_cast<int>(QThread::currentThreadId())); 
    } 
    ~Master() 
    { 
     std::for_each(m_workers.begin(), m_workers.end(), [](std::unique_ptr<Worker>& worker) 
     { 
      worker->quit(); 
      worker->wait(); 
     }); 
    } 


public slots: 
    void Initialize() 
    { 
     printf("[%d]: initializing master thread\n", reinterpret_cast<int>(QThread::currentThreadId())); 
     for (int workerIndex = 0; workerIndex < m_workerCount; ++workerIndex) 
     { 
      auto worker = new Worker(workerIndex); 
      m_workers.push_back(std::move(std::unique_ptr<Worker>(worker))); 
      connect(worker, SIGNAL(WorkDone(Result)), SLOT(WorkDone(Result))); 
      worker->start(); 
     } 
     m_timer = new QTimer(); 
     m_timer->setInterval(500); 
     connect(m_timer, SIGNAL(timeout()), SLOT(GenerateWork())); 
     m_timer->start(); 
    } 
    void GenerateWork() 
    { 
     Work work = { m_activeWorker }; 
     printf("[%d]: dispatching work %d to worker %d\n", reinterpret_cast<int>(QThread::currentThreadId()), work.m_work, m_activeWorker); 
     m_workers[m_activeWorker]->DispatchWork(work); 
     m_activeWorker = ++m_activeWorker % m_workers.size(); 
    } 
    void WorkDone(Result result) 
    { 
     printf("[%d]: received result %d from worker %d\n", reinterpret_cast<int>(QThread::currentThreadId()), result.m_result, result.m_workerIndex); 
    } 
    void Terminate() 
    { 
     m_timer->stop(); 
     delete m_timer; 
    } 

private: 
    int m_workerCount; 
    std::vector<std::unique_ptr<Worker>> m_workers; 
    int m_activeWorker; 
    QTimer* m_timer; 
}; 

QtThreadExample.cpp:

#include "QtThreadExample.hpp" 
#include <QTimer> 

int main(int argc, char** argv) 
{ 
    qRegisterMetaType<Work>("Work"); 
    qRegisterMetaType<Result>("Result"); 
    QApplication application(argc, argv); 
    QThread masterThread; 
    Master master(5); 
    master.moveToThread(&masterThread); 
    master.connect(&masterThread, SIGNAL(started()), SLOT(Initialize())); 
    master.connect(&masterThread, SIGNAL(terminated()), SLOT(Terminate())); 
    masterThread.start(); 
    // Set a timer to terminate the program after 10 seconds 
    QTimer::singleShot(10 * 1000, &application, SLOT(quit())); 
    application.exec(); 
    masterThread.quit(); 
    masterThread.wait(); 
    printf("[%d]: master thread has finished\n", reinterpret_cast<int>(QThread::currentThreadId())); 
    return 0; 
} 

В целом решение фактически не испускает сигнал от самого главного потока, но излучает сигнал от рабочего потока - таким образом, вы получите уникальные сигнал для каждого потока и может обрабатывать работу асинхронно в цикле событий и затем возвращать сигнал о том, что поток выполнен. Образец может и должен быть реорганизован в соответствии с вашими потребностями, но в целом он демонстрирует попытку производителя/потребителя в Qt, используя идею с индексами и сигнальными потоками один за другим. Я использую общий таймер для генерации работы в главном потоке (Master::m_timer). Думаю, в вашем случае вы будете генерировать работу с сигналом, поступающим из сокета, файла или чего-то еще. Затем я вызываю метод в потоке активного работника, и этот метод излучает сигнал в цикле событий рабочего потока, чтобы начать выполнение работы, а затем испустить сигнал о завершении. Это общее описание, посмотрите образец, попробуйте, и если у вас есть какие-либо последующие вопросы, дайте мне знать.

Я думаю, это работает довольно хорошо, если вы используете объекты Qt, но в традиционном смысле шаблонов потребительского/продюсера материал сигнала/слота фактически делает жизнь немного сложнее, стандартный конвейер C++ 11 с std::condition_variable и вызов главного потока condition_variable::notify_one(), а рабочие потоки, просто ожидающие переменную условия, будут проще, однако Qt имеет хорошие обертки для всех материалов ввода-вывода. Поэтому просто попробуйте это и решите.

Ниже приведен пример вывода примера, я предполагаю, что протоколирование нить указывает на то, что требуется эффект достигается: enter image description here

еще одно замечание, поскольку QApplication запускает цикл событий себя, если вы не имеете GUI, вы можете позволить всем вашим объектам ввода/вывода и мастер-классу жить в основном потоке и издавать сигналы оттуда, тем самым устраняя необходимость в отдельном главном потоке. Конечно, если у вас есть графический интерфейс, вы не захотите обременять его этим материалом.

+0

Спасибо за ваш ответ и извините за запутанный вопрос, я отредактировал свой пост, надеясь, что моя проблема станет немного понятнее. Потому что я не знаю, как правильно отправлять сигналы на конкретный и только один из рабочих потоков. – yonobi

+0

@yonobi, как я уже упоминал, одной из возможностей является подключение только тех/тех потоков, которые вы хотите получить. А затем выполните отключение/пересоединение по мере необходимости. –

+0

@yonobi, здесь (https://forum.qt.io/topic/12999/send-signals-to-a-specified-receiver/2) является предметом обсуждения на форуме qt, поскольку указано, что вы хотите, это своего рода против идиомы слота сигнала как таковой. Если вы хотите вызывать явный поток, почему бы просто не сделать метод, который вы можете вызвать напрямую, и это, например, просыпает поток из ожидания и выполняет некоторую работу? Вам нужно иметь кросс-платформенное решение? –

 Смежные вопросы

  • Нет связанных вопросов^_^