моей целью является создание простой пользовательской раковины, способной принимать данные из конвейера, которые должны использоваться для разных приложений (запись, трансляция, внутренняя буферизация и т. Д.).QtGstreamer Apps: зависания и медленные/непригодные образцы
С моей первой попытки идея повторной передачи Http (ов)/Udp/etc. перелистывание через Http снова, поэтому я использую souphttpsrc, очередь и QHttp для обслуживания данных одному или нескольким клиентам.
Приложения, похоже, работают, так как я могу запустить конвейер с помощью моей пользовательской раковины (которая по умолчанию просто игнорирует какой-либо образец, пока не будет подключен хотя бы один клиент), поэтому я просто копирую образцы в ответе клиента.
Что действительно странно в том, что скорость загрузки действительно действительно очень медленная (5-10 КБ/с вместо 2-300 КБ/с, как и ожидалось), чем скорость входящего потока, и, кроме того, данные, как представляется, полностью непригодным для использования. Я попытался вставить decodebin в конвейере перед приемником, а скорость достигла 33 Мбит/с, поэтому я исключил бы проблемы с производительностью, вызванные вызовом очереди в методе в основном потоке, ответственным за отправку фактического образца за сеть; даже в этом случае данные кажутся просто мусором.
На втором месте, если я добавлю тройник (или многоцелевой) для параллельного запуска autovideosink и/или autoaudiosink, конвейер застревает при запуске (это происходит не только с двумя автозаменами), и снова дефект то же самое, если я попытаюсь связать только два пользовательских appsink с tee.
Отладка кода кажется, что newSample() никогда не вызывается, когда конвейер зависает, даже если последнее сообщение, полученное на шине, представляет собой изменение состояния воспроизведения пользовательского приемника.
Спасибо!
Вот код:
QGst :: FlowReturn MultiHttpSink :: newSample() {
QGst::SamplePtr sample = pullSample();
char data[sample->buffer()->size()];
qDebug() << "New Sample: size " << sample->buffer()->size();
qDebug() << sample->buffer()->duration().toTime();
qDebug() << sample->buffer()->presentationTimeStamp().toTime();
sample->buffer()->extract(0, &data, sample->buffer()->size());
for (auto it = resources.begin(); it != resources.end(); it++)
QMetaObject::invokeMethod(&*server, "writeToHttpResource", Qt::QueuedConnection, Q_ARG(QPointer<qhttp::server::QHttpResponse>, *it), Q_ARG(QByteArray, data));
return QGst::FlowOk;
}
void Server::setupServer() {
server = new QHttpServer(app);
server->listen(QHostAddress::Any, 8080, [&](QHttpRequest* req, QHttpResponse * res) {
res->setStatusCode(qhttp::ESTATUS_OK); // http status 200
res->addHeader("Content-Type", "video/mp2t");
m_sink.addAudience(res);
});
if (!server->isListening()) {
fprintf(stderr, "failed. can not listen at port 8080!\n");
throw std::exception();
}
void Server::setupPipeline() {
/* source pipeline */
QString pipe1Descr = QString(
"souphttpsrc location=\"%1\" ! "
"queue ! tee name=splitter "
"splitter.! decodebin ! autoaudiosink "
"splitter.! decodebin ! autovideosink "
"splitter.! appsink name=\"mysink\" "
).arg("URL");
pipeline1 = QGst::Parse::launch(pipe1Descr).dynamicCast<QGst::Pipeline>();
m_sink.setElement(pipeline1->getElementByName("mysink"));
QGlib::connect(pipeline1->bus(), "message", this, &Server::onBusMessage);
pipeline1->bus()->addSignalWatch();
/* start playing */
pipeline1->setState(QGst::StatePlaying);
}
QString stateString(QGst::State state) {
switch (state) {
case 0:
return "Void Pending";
case 1:
return "Null";
case 2:
return "Ready";
case 3:
return "Playing";
case 4:
return "Paused";
}
return "";
}
QString streamStateString(QGst::StreamStatusType type) {
switch (type) {
case 0:
return "Create";
case 1:
return "Enter";
case 2:
return "Leave";
case 3:
return "Destroy";
case 4:
return "Start";
case 5:
return "Pause";
case 6:
return "Stop";
}
}
void Server::onBusMessage(const QGst::MessagePtr& message) {
switch (message->type()) {
case QGst::MessageEos:
app->quit();
break;
case QGst::MessageError:
qCritical() << message.staticCast<QGst::ErrorMessage>()->error();
qCritical() << message.staticCast<QGst::ErrorMessage>()->debugMessage();
break;
case QGst::MessageStateChanged:
{
QGlib::RefPointer<QGst::StateChangedMessage> msg = message.staticCast<QGst::StateChangedMessage>();
qDebug() << msg->source()->name() <<": State changed from " << stateString(msg->oldState())
<< " -> " << stateString(msg->newState()) << " Transitioning to " << stateString(msg->pendingState());
if(msg->source()->name().compare("pipeline0")==0){
if(msg->newState() == QGst::StatePaused){
need_reset = true;
playing = false;
} else {
need_reset = false;
if(msg->newState() == QGst::StatePlaying){
playing = true;
}
}
}
break;
}
case QGst::MessageStreamStatus:
{
QGlib::RefPointer<QGst::StreamStatusMessage> msg = message.staticCast<QGst::StreamStatusMessage>();
qDebug() << "Stream Status: " << streamStateString(msg->statusType());
break;
}
default:
qDebug() << "Unhandles Bus Message Type: " << message->typeName();
break;
}
}
void Server::writeToHttpResource(QPointer<qhttp::server::QHttpResponse> res, QByteArray data) {
qDebug() << "Writing data...";
res->write(data);
}
UPDATE
Журнал Отладочная GStreamer докладывает что-то полезное ...
0:00:00.733339246 [335m 8248[00m 0x2747450 [37mDEBUG [00m [00m souphttpsrc gstsouphttpsrc.c:1430:gst_soup_http_src_chunk_allocator:<souphttpsrc0>[00m alloc 4096 bytes <= 18446744073709551615
0:00:00.733350762 [335m 8248[00m 0x274a8f0 [37mDEBUG [00m [00m tee gsttee.c:774:gst_tee_chain:<splitter>[00m received buffer 0x7f69d000fb80
0:00:00.733353629 [335m 8248[00m 0x2747450 [37mDEBUG [00m [00;01;34m GST_MEMORY gstmemory.c:138:gst_memory_init:[00m new memory 0x7f69d0015120, maxsize:4103 offset:0 size:4096
0:00:00.733361430 [335m 8248[00m 0x274a8f0 [37mDEBUG [00m [00;01;35m GST_SCHEDULING gstpad.c:4174:gst_pad_chain_data_unchecked:<mysink2:sink>[00m calling chainfunction &gst_base_sink_chain with buffer buffer: 0x7f69d000fb80, pts 99:99:99.999999999, dts 99:99:99.999999999, dur 99:99:99.999999999, size 1430, offset 15582, offset_end none, flags 0x0
Размер входящих сетевых пакетов - 4096, но размер буфера составляет 1430 большую часть времени ... исследование .... Я думаю, что я должен получить доступ к буферам по-другому, чтобы получить доступ к необработанным данным.
Размер сетевых пакетов является 1430, 4096, кажется, буферная область ...
0:00:29.316301105 [335m 8248[00m 0x2747450 [37mDEBUG [00m [00m souphttpsrc gstsouphttpsrc.c:1482:gst_soup_http_src_got_chunk_cb:<souphttpsrc0>[00m got chunk of 1430 bytes
Почему передача происходит медленно и бессмысленно остается загадкой сейчас. Почему он висит слишком!
UPDATE 2
Я также попытался таким образом, (Сообщил и те же размеры) одни и те же результаты:
QGst::FlowReturn MultiHttpSink::newSample() {
QGst::SamplePtr sample = pullSample();
QGst::BufferPtr buffer = sample->buffer();
qDebug() << "New Sample: size " << buffer->size();
QGst::MapInfo map;
if (!buffer->map(map, QGst::MapRead))
return QGst::FlowError;
qDebug() << "New Buffer Map: size " << map.size();
for (auto it = resources.begin(); it != resources.end(); it++)
QMetaObject::invokeMethod(&*server, "writeToHttpResource", Qt::QueuedConnection, Q_ARG(QPointer<qhttp::server::QHttpResponse>, *it), Q_ARG(QByteArray, (char *)map.data()));
buffer->unmap(map);
return QGst::FlowOk;
}
Здесь выход последней версии, так же, как и раньше:
... ...
New Sample: size 1430
New Buffer Map: size 1430
QTime("23:34:33.709")
QTime("23:34:33.709")
Writing data...
New Sample: size 4096
New Buffer Map: size 4096
QTime("23:34:33.709")
QTime("23:34:33.709")
Writing data...
... ...
ОБНОВЛЕНИЕ 3
Это бревно между каждым блоком данных:
New Sample: size 1430
New Buffer Map: size 1430
QTime("23:34:33.709")
QTime("23:34:33.709")
0:04:28.221638796 [335m28768[00m 0xd8a8f0 [37mDEBUG [00m [00m basesink gstbasesink.c:3566:gst_base_sink_chain_unlocked:<mysink2>[00m object unref after render 0x7f894400f850
0:04:28.221653137 [335m28768[00m 0xd8a8f0 [37mDEBUG [00m [00;01;35m GST_SCHEDULING gstpad.c:4180:gst_pad_chain_data_unchecked:<mysink2:sink>[00m called chainfunction &gst_base_sink_chain with buffer 0x7f894400f850, returned ok
0:04:28.221667131 [335m28768[00m 0xd8a8f0 [37mDEBUG [00m [00m tee gsttee.c:778:gst_tee_chain:<splitter>[00m handled buffer ok
0:04:28.221676164 [335m28768[00m 0xd8a8f0 [37mDEBUG [00m [00;01;35m GST_SCHEDULING gstpad.c:4180:gst_pad_chain_data_unchecked:<splitter:sink>[00m called chainfunction &gst_tee_chain with buffer 0x7f894400f850, returned ok
0:04:28.221687738 [335m28768[00m 0xd8a8f0 [37mDEBUG [00m [00m queue_dataflow gstqueue.c:1482:gst_queue_loop:<queue0>[00m queue is empty
0:04:28.222861204 [335m28768[00m 0xd87450 [37mDEBUG [00m [00m souphttpsrc gstsouphttpsrc.c:1430:gst_soup_http_src_chunk_allocator:<souphttpsrc0>[00m alloc 4096 bytes <= 18446744073709551615
0:04:28.222896720 [335m28768[00m 0xd87450 [37mDEBUG [00m [00;01;34m GST_MEMORY gstmemory.c:138:gst_memory_init:[00m new memory 0x7f8944012fe0, maxsize:4103 offset:0 size:4096
0:04:28.222926663 [335m28768[00m 0xd87450 [37mDEBUG [00m [00m souphttpsrc gstsouphttpsrc.c:1482:gst_soup_http_src_got_chunk_cb:<souphttpsrc0>[00m got chunk of 1430 bytes
0:04:28.222956032 [335m28768[00m 0xd87450 [37mDEBUG [00m [00m basesrc gstbasesrc.c:2316:gst_base_src_do_sync:<souphttpsrc0>[00m no sync needed
0:04:28.222968263 [335m28768[00m 0xd87450 [37mDEBUG [00m [00m basesrc gstbasesrc.c:2520:gst_base_src_get_range:<souphttpsrc0>[00m buffer ok
0:04:28.222978663 [335m28768[00m 0xd87450 [37mDEBUG [00m [00;01;35m GST_SCHEDULING gstpad.c:4174:gst_pad_chain_data_unchecked:<queue0:sink>[00m calling chainfunction &gst_queue_chain with buffer buffer: 0x7f894400f630, pts 99:99:99.999999999, dts 99:99:99.999999999, dur 99:99:99.999999999, size 1430, offset 20789192, offset_end none, flags 0x0
0:04:28.223004500 [335m28768[00m 0xd87450 [37mDEBUG [00m [00;01;35m GST_SCHEDULING gstpad.c:4180:gst_pad_chain_data_unchecked:<queue0:sink>[00m called chainfunction &gst_queue_chain with buffer 0x7f894400f630, returned ok
0:04:28.223013700 [335m28768[00m 0xd8a8f0 [37mDEBUG [00m [00m queue_dataflow gstqueue.c:1494:gst_queue_loop:<queue0>[00m queue is not empty
0:04:28.223031507 [335m28768[00m 0xd87450 [37mDEBUG [00m [00m basesrc gstbasesrc.c:2355:gst_base_src_update_length:<souphttpsrc0>[00m reading offset 20790622, length 4096, size -1, segment.stop -1, maxsize -1
0:04:28.223066379 [335m28768[00m 0xd87450 [37mDEBUG [00m [00m basesrc gstbasesrc.c:2456:gst_base_src_get_range:<souphttpsrc0>[00m calling create offset 20790622 length 4096, time 0
0:04:28.223056071 [335m28768[00m 0xd8a8f0 [37mDEBUG [00m [00;01;35m GST_SCHEDULING gstpad.c:4174:gst_pad_chain_data_unchecked:<splitter:sink>[00m calling chainfunction &gst_tee_chain with buffer buffer: 0x7f894400f630, pts 99:99:99.999999999, dts 99:99:99.999999999, dur 99:99:99.999999999, size 1430, offset 20789192, offset_end none, flags 0x0
0:04:28.223111831 [335m28768[00m 0xd8a8f0 [37mDEBUG [00m [00m tee gsttee.c:774:gst_tee_chain:<splitter>[00m received buffer 0x7f894400f630
0:04:28.223121952 [335m28768[00m 0xd87450 [37mDEBUG [00m [00m souphttpsrc gstsouphttpsrc.c:1430:gst_soup_http_src_chunk_allocator:<souphttpsrc0>[00m alloc 4096 bytes <= 18446744073709551615
0:04:28.223139071 [335m28768[00m 0xd8a8f0 [37mDEBUG [00m [00;01;35m GST_SCHEDULING gstpad.c:4174:gst_pad_chain_data_unchecked:<mysink2:sink>[00m calling chainfunction &gst_base_sink_chain with buffer buffer: 0x7f894400f630, pts 99:99:99.999999999, dts 99:99:99.999999999, dur 99:99:99.999999999, size 1430, offset 20789192, offset_end none, flags 0x0
0:04:28.223152343 [335m28768[00m 0xd87450 [37mDEBUG [00m [00;01;34m GST_MEMORY gstmemory.c:138:gst_memory_init:[00m new memory 0x7f8944015120, maxsize:4103 offset:0 size:4096
0:04:28.223179317 [335m28768[00m 0xd8a8f0 [37mDEBUG [00m [00m basesink gstbasesink.c:3409:gst_base_sink_chain_unlocked:<mysink2>[00m got times start: 99:99:99.999999999, end: 99:99:99.999999999
0:04:28.223208511 [335m28768[00m 0xd8a8f0 [37mDEBUG [00m [00m basesink gstbasesink.c:1958:gst_base_sink_get_sync_times:<mysink2>[00m got times start: 99:99:99.999999999, stop: 99:99:99.999999999, do_sync 0
0:04:28.223222869 [335m28768[00m 0xd87450 [37mDEBUG [00m [00;01;34m GST_MEMORY gstmemory.c:87:_gst_memory_free:[00m free memory 0x7f8944015120
0:04:28.223238437 [335m28768[00m 0xd8a8f0 [37mDEBUG [00m [00;04m default gstsegment.c:731:gst_segment_to_running_time_full:[00m invalid position (-1)
0:04:28.223281513 [335m28768[00m 0xd8a8f0 [37mDEBUG [00m [00;04m default gstsegment.c:731:gst_segment_to_running_time_full:[00m invalid position (-1)
0:04:28.223298205 [335m28768[00m 0xd8a8f0 [37mDEBUG [00m [00m basesink gstbasesink.c:3520:gst_base_sink_chain_unlocked:<mysink2>[00m rendering object 0x7f894400f630
0:04:28.223312428 [335m28768[00m 0xd8a8f0 [37mDEBUG [00m [00m basesink gstbasesink.c:946:gst_base_sink_set_last_buffer_unlocked:<mysink2>[00m setting last buffer to 0x7f894400f630
0:04:28.223321131 [335m28768[00m 0xd8a8f0 [37mDEBUG [00m [00;01;34m GST_MEMORY gstmemory.c:87:_gst_memory_free:[00m free memory 0x7f8944014080
0:04:28.223344396 [335m28768[00m 0xd8a8f0 [37mDEBUG [00m [00;01;34m GST_CAPS gstpad.c:2641:gst_pad_has_current_caps:<mysink2:sink>[00m check current pad caps (NULL)
0:04:28.223355939 [335m28768[00m 0xd8a8f0 [37mDEBUG [00m [00m appsink gstappsink.c:760:gst_app_sink_render:<mysink2>[00m pushing render buffer 0x7f894400f630 on queue (0)
0:04:28.223369213 [335m28768[00m 0xd8a8f0 [37mDEBUG [00m [00m appsink gstappsink.c:1286:gst_app_sink_pull_sample:<mysink2>[00m trying to grab a buffer
0:04:28.223377762 [335m28768[00m 0xd8a8f0 [37mDEBUG [00m [00m appsink gstappsink.c:706:dequeue_buffer:<mysink2>[00m dequeued buffer 0x7f894400f630
0:04:28.223386131 [335m28768[00m 0xd8a8f0 [37mDEBUG [00m [00m appsink gstappsink.c:1301:gst_app_sink_pull_sample:<mysink2>[00m we have a buffer 0x7f894400f630
New Sample: size 1430
New Buffer Map: size 1430
QTime("23:34:33.709")
QTime("23:34:33.709")
Спасибо за повтор, я добавил очередь, как было предложено, но это не проблема, я собираюсь написать решение ниже! – Gianks
Из любопытства, который я хотел знать, это работает для вас, не добавляя очереди, только имея решение, упомянутое ниже. –
Нет, эта часть все еще не работает, конвейер застревает, если я добавлю что-нибудь еще в свои приложения. Я собираюсь открыть новый вопрос о SO. – Gianks