2016-06-09 9 views
1

моей целью является создание простой пользовательской раковины, способной принимать данные из конвейера, которые должны использоваться для разных приложений (запись, трансляция, внутренняя буферизация и т. Д.).QtGstreamer Apps: зависания и медленные/непригодные образцы

С моей первой попытки идея повторной передачи Http (ов)/Udp/etc. перелистывание через Http снова, поэтому я использую souphttpsrc, очередь и QHttp для обслуживания данных одному или нескольким клиентам.

Приложения, похоже, работают, так как я могу запустить конвейер с помощью моей пользовательской раковины (которая по умолчанию просто игнорирует какой-либо образец, пока не будет подключен хотя бы один клиент), поэтому я просто копирую образцы в ответе клиента.

Что действительно странно в том, что скорость загрузки действительно действительно очень медленная (5-10 КБ/с вместо 2-300 КБ/с, как и ожидалось), чем скорость входящего потока, и, кроме того, данные, как представляется, полностью непригодным для использования. Я попытался вставить decodebin в конвейере перед приемником, а скорость достигла 33 Мбит/с, поэтому я исключил бы проблемы с производительностью, вызванные вызовом очереди в методе в основном потоке, ответственным за отправку фактического образца за сеть; даже в этом случае данные кажутся просто мусором.

На втором месте, если я добавлю тройник (или многоцелевой) для параллельного запуска autovideosink и/или autoaudiosink, конвейер застревает при запуске (это происходит не только с двумя автозаменами), и снова дефект то же самое, если я попытаюсь связать только два пользовательских appsink с tee.

Отладка кода кажется, что newSample() никогда не вызывается, когда конвейер зависает, даже если последнее сообщение, полученное на шине, представляет собой изменение состояния воспроизведения пользовательского приемника.

Спасибо!

Вот код:

QGst :: FlowReturn MultiHttpSink :: newSample() {

 QGst::SamplePtr sample = pullSample(); 
     char data[sample->buffer()->size()]; 
     qDebug() << "New Sample: size " << sample->buffer()->size(); 
     qDebug() << sample->buffer()->duration().toTime(); 
     qDebug() << sample->buffer()->presentationTimeStamp().toTime(); 

     sample->buffer()->extract(0, &data, sample->buffer()->size()); 
     for (auto it = resources.begin(); it != resources.end(); it++) 
      QMetaObject::invokeMethod(&*server, "writeToHttpResource", Qt::QueuedConnection, Q_ARG(QPointer<qhttp::server::QHttpResponse>, *it), Q_ARG(QByteArray, data)); 

     return QGst::FlowOk; 
    } 


    void Server::setupServer() { 
     server = new QHttpServer(app); 
     server->listen(QHostAddress::Any, 8080, [&](QHttpRequest* req, QHttpResponse * res) { 

      res->setStatusCode(qhttp::ESTATUS_OK); // http status 200 
      res->addHeader("Content-Type", "video/mp2t"); 

      m_sink.addAudience(res); 
     }); 

     if (!server->isListening()) { 
      fprintf(stderr, "failed. can not listen at port 8080!\n"); 
      throw std::exception(); 
     } 

    void Server::setupPipeline() { 

     /* source pipeline */ 
     QString pipe1Descr = QString(
          "souphttpsrc location=\"%1\" ! " 
       "queue ! tee name=splitter " 
       "splitter.! decodebin ! autoaudiosink " 
       "splitter.! decodebin ! autovideosink " 
       "splitter.! appsink name=\"mysink\" " 
          ).arg("URL"); 
     pipeline1 = QGst::Parse::launch(pipe1Descr).dynamicCast<QGst::Pipeline>(); 
     m_sink.setElement(pipeline1->getElementByName("mysink")); 

     QGlib::connect(pipeline1->bus(), "message", this, &Server::onBusMessage); 
     pipeline1->bus()->addSignalWatch(); 
     /* start playing */ 
     pipeline1->setState(QGst::StatePlaying); 

    } 



    QString stateString(QGst::State state) { 
     switch (state) { 
      case 0: 
       return "Void Pending"; 
      case 1: 
       return "Null"; 
      case 2: 
       return "Ready"; 
      case 3: 
       return "Playing"; 
      case 4: 
       return "Paused"; 
     } 
     return ""; 
    } 

    QString streamStateString(QGst::StreamStatusType type) { 
     switch (type) { 
      case 0: 
       return "Create"; 
      case 1: 
       return "Enter"; 
      case 2: 
       return "Leave"; 
      case 3: 
       return "Destroy"; 
      case 4: 
       return "Start"; 
      case 5: 
       return "Pause"; 
      case 6: 
       return "Stop"; 
     } 
    } 

    void Server::onBusMessage(const QGst::MessagePtr& message) { 

     switch (message->type()) { 
      case QGst::MessageEos: 
       app->quit(); 
       break; 
      case QGst::MessageError: 
       qCritical() << message.staticCast<QGst::ErrorMessage>()->error(); 
       qCritical() << message.staticCast<QGst::ErrorMessage>()->debugMessage(); 
       break; 
      case QGst::MessageStateChanged: 
      { 
       QGlib::RefPointer<QGst::StateChangedMessage> msg = message.staticCast<QGst::StateChangedMessage>(); 
       qDebug() << msg->source()->name() <<": State changed from " << stateString(msg->oldState()) 
         << " -> " << stateString(msg->newState()) << " Transitioning to " << stateString(msg->pendingState()); 
       if(msg->source()->name().compare("pipeline0")==0){ 
        if(msg->newState() == QGst::StatePaused){ 
         need_reset = true; 
         playing = false; 
        } else { 
         need_reset = false; 
         if(msg->newState() == QGst::StatePlaying){ 
          playing = true; 
         } 
        } 
       } 
       break; 
      } 
      case QGst::MessageStreamStatus: 
      { 
       QGlib::RefPointer<QGst::StreamStatusMessage> msg = message.staticCast<QGst::StreamStatusMessage>(); 
       qDebug() << "Stream Status: " << streamStateString(msg->statusType()); 
       break; 
      } 
      default: 
       qDebug() << "Unhandles Bus Message Type: " << message->typeName(); 
       break; 
     } 
    } 

    void Server::writeToHttpResource(QPointer<qhttp::server::QHttpResponse> res, QByteArray data) { 
     qDebug() << "Writing data..."; 
     res->write(data); 
    } 

UPDATE

Журнал Отладочная GStreamer докладывает что-то полезное ...

0:00:00.733339246 [335m 8248[00m  0x2747450 [37mDEBUG [00m [00m   souphttpsrc gstsouphttpsrc.c:1430:gst_soup_http_src_chunk_allocator:<souphttpsrc0>[00m alloc 4096 bytes <= 18446744073709551615 
    0:00:00.733350762 [335m 8248[00m  0x274a8f0 [37mDEBUG [00m [00m     tee gsttee.c:774:gst_tee_chain:<splitter>[00m received buffer 0x7f69d000fb80 
    0:00:00.733353629 [335m 8248[00m  0x2747450 [37mDEBUG [00m [00;01;34m   GST_MEMORY gstmemory.c:138:gst_memory_init:[00m new memory 0x7f69d0015120, maxsize:4103 offset:0 size:4096 
    0:00:00.733361430 [335m 8248[00m  0x274a8f0 [37mDEBUG [00m [00;01;35m  GST_SCHEDULING gstpad.c:4174:gst_pad_chain_data_unchecked:<mysink2:sink>[00m calling chainfunction &gst_base_sink_chain with buffer buffer: 0x7f69d000fb80, pts 99:99:99.999999999, dts 99:99:99.999999999, dur 99:99:99.999999999, size 1430, offset 15582, offset_end none, flags 0x0 

Размер входящих сетевых пакетов - 4096, но размер буфера составляет 1430 большую часть времени ... исследование .... Я думаю, что я должен получить доступ к буферам по-другому, чтобы получить доступ к необработанным данным.

Размер сетевых пакетов является 1430, 4096, кажется, буферная область ...

0:00:29.316301105 [335m 8248[00m  0x2747450 [37mDEBUG [00m [00m   souphttpsrc gstsouphttpsrc.c:1482:gst_soup_http_src_got_chunk_cb:<souphttpsrc0>[00m got chunk of 1430 bytes 

Почему передача происходит медленно и бессмысленно остается загадкой сейчас. Почему он висит слишком!

UPDATE 2

Я также попытался таким образом, (Сообщил и те же размеры) одни и те же результаты:

QGst::FlowReturn MultiHttpSink::newSample() { 

     QGst::SamplePtr sample = pullSample(); 
     QGst::BufferPtr buffer = sample->buffer(); 

     qDebug() << "New Sample: size " << buffer->size(); 

     QGst::MapInfo map; 

     if (!buffer->map(map, QGst::MapRead)) 
      return QGst::FlowError; 

     qDebug() << "New Buffer Map: size " << map.size(); 


     for (auto it = resources.begin(); it != resources.end(); it++) 
      QMetaObject::invokeMethod(&*server, "writeToHttpResource", Qt::QueuedConnection, Q_ARG(QPointer<qhttp::server::QHttpResponse>, *it), Q_ARG(QByteArray, (char *)map.data())); 

     buffer->unmap(map); 

     return QGst::FlowOk; 
    } 

Здесь выход последней версии, так же, как и раньше:

... ... 
New Sample: size 1430 
New Buffer Map: size 1430 
QTime("23:34:33.709") 
QTime("23:34:33.709") 
Writing data... 
New Sample: size 4096 
New Buffer Map: size 4096 
QTime("23:34:33.709") 
QTime("23:34:33.709") 
Writing data... 
... ... 

ОБНОВЛЕНИЕ 3

Это бревно между каждым блоком данных:

New Sample: size 1430 
    New Buffer Map: size 1430 
    QTime("23:34:33.709") 
    QTime("23:34:33.709") 
    0:04:28.221638796 [335m28768[00m  0xd8a8f0 [37mDEBUG [00m [00m   basesink gstbasesink.c:3566:gst_base_sink_chain_unlocked:<mysink2>[00m object unref after render 0x7f894400f850 
    0:04:28.221653137 [335m28768[00m  0xd8a8f0 [37mDEBUG [00m [00;01;35m  GST_SCHEDULING gstpad.c:4180:gst_pad_chain_data_unchecked:<mysink2:sink>[00m called chainfunction &gst_base_sink_chain with buffer 0x7f894400f850, returned ok 
    0:04:28.221667131 [335m28768[00m  0xd8a8f0 [37mDEBUG [00m [00m     tee gsttee.c:778:gst_tee_chain:<splitter>[00m handled buffer ok 
    0:04:28.221676164 [335m28768[00m  0xd8a8f0 [37mDEBUG [00m [00;01;35m  GST_SCHEDULING gstpad.c:4180:gst_pad_chain_data_unchecked:<splitter:sink>[00m called chainfunction &gst_tee_chain with buffer 0x7f894400f850, returned ok 
    0:04:28.221687738 [335m28768[00m  0xd8a8f0 [37mDEBUG [00m [00m  queue_dataflow gstqueue.c:1482:gst_queue_loop:<queue0>[00m queue is empty 
    0:04:28.222861204 [335m28768[00m  0xd87450 [37mDEBUG [00m [00m   souphttpsrc gstsouphttpsrc.c:1430:gst_soup_http_src_chunk_allocator:<souphttpsrc0>[00m alloc 4096 bytes <= 18446744073709551615 
    0:04:28.222896720 [335m28768[00m  0xd87450 [37mDEBUG [00m [00;01;34m   GST_MEMORY gstmemory.c:138:gst_memory_init:[00m new memory 0x7f8944012fe0, maxsize:4103 offset:0 size:4096 
    0:04:28.222926663 [335m28768[00m  0xd87450 [37mDEBUG [00m [00m   souphttpsrc gstsouphttpsrc.c:1482:gst_soup_http_src_got_chunk_cb:<souphttpsrc0>[00m got chunk of 1430 bytes 
    0:04:28.222956032 [335m28768[00m  0xd87450 [37mDEBUG [00m [00m    basesrc gstbasesrc.c:2316:gst_base_src_do_sync:<souphttpsrc0>[00m no sync needed 
    0:04:28.222968263 [335m28768[00m  0xd87450 [37mDEBUG [00m [00m    basesrc gstbasesrc.c:2520:gst_base_src_get_range:<souphttpsrc0>[00m buffer ok 
    0:04:28.222978663 [335m28768[00m  0xd87450 [37mDEBUG [00m [00;01;35m  GST_SCHEDULING gstpad.c:4174:gst_pad_chain_data_unchecked:<queue0:sink>[00m calling chainfunction &gst_queue_chain with buffer buffer: 0x7f894400f630, pts 99:99:99.999999999, dts 99:99:99.999999999, dur 99:99:99.999999999, size 1430, offset 20789192, offset_end none, flags 0x0 
    0:04:28.223004500 [335m28768[00m  0xd87450 [37mDEBUG [00m [00;01;35m  GST_SCHEDULING gstpad.c:4180:gst_pad_chain_data_unchecked:<queue0:sink>[00m called chainfunction &gst_queue_chain with buffer 0x7f894400f630, returned ok 
    0:04:28.223013700 [335m28768[00m  0xd8a8f0 [37mDEBUG [00m [00m  queue_dataflow gstqueue.c:1494:gst_queue_loop:<queue0>[00m queue is not empty 
    0:04:28.223031507 [335m28768[00m  0xd87450 [37mDEBUG [00m [00m    basesrc gstbasesrc.c:2355:gst_base_src_update_length:<souphttpsrc0>[00m reading offset 20790622, length 4096, size -1, segment.stop -1, maxsize -1 
    0:04:28.223066379 [335m28768[00m  0xd87450 [37mDEBUG [00m [00m    basesrc gstbasesrc.c:2456:gst_base_src_get_range:<souphttpsrc0>[00m calling create offset 20790622 length 4096, time 0 
    0:04:28.223056071 [335m28768[00m  0xd8a8f0 [37mDEBUG [00m [00;01;35m  GST_SCHEDULING gstpad.c:4174:gst_pad_chain_data_unchecked:<splitter:sink>[00m calling chainfunction &gst_tee_chain with buffer buffer: 0x7f894400f630, pts 99:99:99.999999999, dts 99:99:99.999999999, dur 99:99:99.999999999, size 1430, offset 20789192, offset_end none, flags 0x0 
    0:04:28.223111831 [335m28768[00m  0xd8a8f0 [37mDEBUG [00m [00m     tee gsttee.c:774:gst_tee_chain:<splitter>[00m received buffer 0x7f894400f630 
    0:04:28.223121952 [335m28768[00m  0xd87450 [37mDEBUG [00m [00m   souphttpsrc gstsouphttpsrc.c:1430:gst_soup_http_src_chunk_allocator:<souphttpsrc0>[00m alloc 4096 bytes <= 18446744073709551615 
    0:04:28.223139071 [335m28768[00m  0xd8a8f0 [37mDEBUG [00m [00;01;35m  GST_SCHEDULING gstpad.c:4174:gst_pad_chain_data_unchecked:<mysink2:sink>[00m calling chainfunction &gst_base_sink_chain with buffer buffer: 0x7f894400f630, pts 99:99:99.999999999, dts 99:99:99.999999999, dur 99:99:99.999999999, size 1430, offset 20789192, offset_end none, flags 0x0 
    0:04:28.223152343 [335m28768[00m  0xd87450 [37mDEBUG [00m [00;01;34m   GST_MEMORY gstmemory.c:138:gst_memory_init:[00m new memory 0x7f8944015120, maxsize:4103 offset:0 size:4096 
    0:04:28.223179317 [335m28768[00m  0xd8a8f0 [37mDEBUG [00m [00m   basesink gstbasesink.c:3409:gst_base_sink_chain_unlocked:<mysink2>[00m got times start: 99:99:99.999999999, end: 99:99:99.999999999 
    0:04:28.223208511 [335m28768[00m  0xd8a8f0 [37mDEBUG [00m [00m   basesink gstbasesink.c:1958:gst_base_sink_get_sync_times:<mysink2>[00m got times start: 99:99:99.999999999, stop: 99:99:99.999999999, do_sync 0 
    0:04:28.223222869 [335m28768[00m  0xd87450 [37mDEBUG [00m [00;01;34m   GST_MEMORY gstmemory.c:87:_gst_memory_free:[00m free memory 0x7f8944015120 
    0:04:28.223238437 [335m28768[00m  0xd8a8f0 [37mDEBUG [00m [00;04m    default gstsegment.c:731:gst_segment_to_running_time_full:[00m invalid position (-1) 
    0:04:28.223281513 [335m28768[00m  0xd8a8f0 [37mDEBUG [00m [00;04m    default gstsegment.c:731:gst_segment_to_running_time_full:[00m invalid position (-1) 
    0:04:28.223298205 [335m28768[00m  0xd8a8f0 [37mDEBUG [00m [00m   basesink gstbasesink.c:3520:gst_base_sink_chain_unlocked:<mysink2>[00m rendering object 0x7f894400f630 
    0:04:28.223312428 [335m28768[00m  0xd8a8f0 [37mDEBUG [00m [00m   basesink gstbasesink.c:946:gst_base_sink_set_last_buffer_unlocked:<mysink2>[00m setting last buffer to 0x7f894400f630 
    0:04:28.223321131 [335m28768[00m  0xd8a8f0 [37mDEBUG [00m [00;01;34m   GST_MEMORY gstmemory.c:87:_gst_memory_free:[00m free memory 0x7f8944014080 
    0:04:28.223344396 [335m28768[00m  0xd8a8f0 [37mDEBUG [00m [00;01;34m   GST_CAPS gstpad.c:2641:gst_pad_has_current_caps:<mysink2:sink>[00m check current pad caps (NULL) 
    0:04:28.223355939 [335m28768[00m  0xd8a8f0 [37mDEBUG [00m [00m    appsink gstappsink.c:760:gst_app_sink_render:<mysink2>[00m pushing render buffer 0x7f894400f630 on queue (0) 
    0:04:28.223369213 [335m28768[00m  0xd8a8f0 [37mDEBUG [00m [00m    appsink gstappsink.c:1286:gst_app_sink_pull_sample:<mysink2>[00m trying to grab a buffer 
    0:04:28.223377762 [335m28768[00m  0xd8a8f0 [37mDEBUG [00m [00m    appsink gstappsink.c:706:dequeue_buffer:<mysink2>[00m dequeued buffer 0x7f894400f630 
    0:04:28.223386131 [335m28768[00m  0xd8a8f0 [37mDEBUG [00m [00m    appsink gstappsink.c:1301:gst_app_sink_pull_sample:<mysink2>[00m we have a buffer 0x7f894400f630 
    New Sample: size 1430 
    New Buffer Map: size 1430 
    QTime("23:34:33.709") 
    QTime("23:34:33.709") 

ответ

0

Наконец-то я нашел грязную ошибку!

QMetaObject::invokeMethod(&*server, "writeToHttpResource", Qt::QueuedConnection, Q_ARG(QPointer<qhttp::server::QHttpResponse>, *it), Q_ARG(QByteArray, (char *)map.data())); 

Даже если этот код компилируется он работает плохо, так как это:

Q_ARG(QByteArray, (char *)map.data()) 

Использование QByteArray (символ *) неправильно в этом случае, так как мы не инициализирует его из строки, а из кусок необработанных данных. Написанный, так как это был кусок, анализировался все время, ища новую комбинацию строк, произвольно разбивая кусок данных.

Чтобы избежать этого, используется следующий конструктор: QByteArray (char * ptr, int size).

Это правильный вариант метода:

QMetaObject::invokeMethod(&*server, "writeToHttpResource", Qt::QueuedConnection, Q_ARG(QPointer<GssClientRequest>, *it), Q_ARG(QByteArray, QByteArray((char *) map.data(), map.size()))); 

Был не так сильно ошибка, чтобы победить в конце концов;)

Спасибо за поддержку!

1

Добавить очереди перед тем decodebin и после тройника, тройник необходим контекст для запуска.

"souphttpsrc место = \" % 1 \»! " "очередь! Имя тройника = разветвитель" "разветвитель.! Очередь! Decodebin! Autoaudiosink" "разветвитель.! Очередь! Decodebin! Autovideosink" " splink. appsink name = \ "mysink \" "

Последний сплиттер, связанный с appsink, не нуждается в очереди, он будет запущен в контексте исходного плагина, созданного gsttask.

+0

Спасибо за повтор, я добавил очередь, как было предложено, но это не проблема, я собираюсь написать решение ниже! – Gianks

+0

Из любопытства, который я хотел знать, это работает для вас, не добавляя очереди, только имея решение, упомянутое ниже. –

+0

Нет, эта часть все еще не работает, конвейер застревает, если я добавлю что-нибудь еще в свои приложения. Я собираюсь открыть новый вопрос о SO. – Gianks