Следующая полезная, рудиментарная реализация. Он использует внутреннюю пару сигнальных слотов для перемещения данных на другую конечную точку. Таким образом, любой конец соединения может жить в любом потоке, а концы могут перемещаться между потоками без потери данных или вызывать любые расы.
Частный QRingBuffer
используется вместо изобретательства колеса. Добавьте QT += core-private
в файл .pro
, чтобы сделать его доступным.
При инициализации без другой конечной точки необходимо передать явный nullptr
в качестве первого аргумента конструктору. Это помогает избежать путаницы между конечной точкой и родителем объекта.
Если вы хотите создать экземпляр открытой трубы, как это часто бывает, вы можете передать режим ввода-вывода конструктору. Типичное использование:
int main(/*…*/)
{
/*…*/
AppPipe end1 { nullptr, QIODevice::ReadWrite };
AppPipe end2 { &end1, QIODevice::ReadWrite };
// the pipes are open ready to use
/*…*/
}
Все, что вы пишете на одну трубу, заканчивается как читаемые данные в другом, и наоборот. Все семантика QIODevice
- вы можете подключиться к сигналу readyRead
, использовать трубу с QDataStream
или QTextStream
и т. Д. Как и в случае с любым QIODevice
, вы можете использовать этот класс только с его thread()
, но другая конечная точка может жить в любом нить, и оба могут перемещаться между потоками по желанию без потери данных.
Если другой конец трубы не является открытым и читаемым, записи не являются операционными системами, даже если они преуспевают. Закрытие трубы очищает буфер чтения, так что его можно повторно открыть для повторного использования.
Сигналы hasIncoming
и hasOutgoing
полезны в monitoring данных, проходящих по трубе.
// https://github.com/KubaO/stackoverflown/tree/master/questions/local-pipe-32317081
#include <QtCore>
#include <private/qringbuffer_p.h>
/// A simple point-to-point intra-process pipe. The other endpoint can live in any
/// thread.
class AppPipe : public QIODevice {
Q_OBJECT
QRingBuffer m_buf;
Q_SLOT void _a_write(const QByteArray & data) {
if (! openMode() & QIODevice::ReadOnly) return; // We must be readable.
m_buf.append(data);
emit hasIncoming(data);
emit readyRead();
}
public:
AppPipe(AppPipe * other, QIODevice::OpenMode mode, QObject * parent = {}) :
AppPipe(other, parent) {
open(mode);
}
AppPipe(AppPipe * other = {}, QObject * parent = {}) : QIODevice(parent) {
addOther(other);
}
void addOther(AppPipe * other) {
if (other) {
connect(this, &AppPipe::hasOutgoing, other, &AppPipe::_a_write, Qt::UniqueConnection);
connect(other, &AppPipe::hasOutgoing, this, &AppPipe::_a_write, Qt::UniqueConnection);
}
}
void removeOther(AppPipe * other) {
disconnect(this, &AppPipe::hasOutgoing, other, &AppPipe::_a_write);
disconnect(other, &AppPipe::hasOutgoing, this, &AppPipe::_a_write);
}
void close() override {
QIODevice::close();
m_buf.clear();
}
qint64 writeData(const char * data, qint64 maxSize) override {
if (maxSize > 0)
hasOutgoing(QByteArray(data, maxSize));
return maxSize;
}
qint64 readData(char * data, qint64 maxLength) override {
return m_buf.read(data, maxLength);
}
qint64 bytesAvailable() const override {
return m_buf.size() + QIODevice::bytesAvailable();
}
bool canReadLine() const override {
return QIODevice::canReadLine() || m_buf.canReadLine();
}
bool isSequential() const override { return true; }
Q_SIGNAL void hasOutgoing(const QByteArray &);
Q_SIGNAL void hasIncoming(const QByteArray &);
};
# local-pipe-32317081.pro
QT = core-private
TARGET = local-pipe-32317081
CONFIG += c++11
TEMPLATE = app
SOURCES = main.cpp