2016-06-06 6 views
2

Я пишу базовый загрузчик Http Live Stream (HLS), где я повторно загружаю медиа-плейлист m3u8 с интервалом, заданным «# EXT-X-TARGETDURATION», а затем загружаем сегменты * .ts, поскольку они становятся доступными.Как убедиться, что данные нескольких загрузок Async сохранены в том порядке, в котором они были запущены?

Это то, что мог бы выглядеть при воспроизведении медиа плейлиста m3u8.

#EXTM3U 
#EXT-X-VERSION:3 
#EXT-X-TARGETDURATION:12 
#EXT-X-MEDIA-SEQUENCE:1 
#EXTINF:7.975, 
http://website.com/segment_1.ts 
#EXTINF:7.941, 
http://website.com/segment_2.ts 
#EXTINF:7.975, 
http://website.com/segment_3.ts 

загрузить эти * .ts сегменты все в то же время с HttpClient асинхронной/ОЖИДАНИЕ хотелось бы. Сегменты не имеют одинакового размера, поэтому, несмотря на то, что сначала загружается загрузка «segment_1.ts», она может завершиться после двух других сегментов.

Эти сегменты являются частью одного большого видео, поэтому важно, чтобы данные загруженных сегментов записывались в том порядке, в котором они были запущены, а не в том порядке, в котором они были закончены.

Мой код ниже работает отлично, если сегменты загружаются один за другим, но не тогда, когда несколько сегментов загружаются одновременно, потому что иногда они не заканчиваются в том порядке, в котором они были запущены.

Я думал об использовании Task.WhenAll, который гарантирует правильный порядок, но я не хочу, чтобы загруженные сегменты в памяти излишне, потому что они могут быть размером в несколько мегабайт. Если загрузка «segment_1.ts» сначала заканчивается, тогда ее нужно сразу записать на диск, не дожидаясь завершения остальных сегментов. Написание всех сегментов * .ts для разделения файлов и присоединения к ним в конце также не является вариантом, поскольку для этого потребуется двойное дисковое пространство, а общее видео может иметь размер в несколько гигабайт.

Я понятия не имею, как это сделать, и мне интересно, может ли кто-нибудь помочь мне в этом. Я ищу способ, который не требует, чтобы я вручную создавал потоки или блокировал поток ThreadPool в течение длительного периода времени.

Некоторые из кода и обработки исключений были удалены, чтобы было легче увидеть, что происходит.

// Async BlockingCollection from the AsyncEx library 
private AsyncCollection<byte[]> segmentDataQueue = new AsyncCollection<byte[]>(); 

public void Start() 
{ 
    RunConsumer(); 
    RunProducer(); 
} 

private async void RunProducer() 
{ 
    while (!_isCancelled) 
    { 
     var response = await _client.GetAsync(_playlistBaseUri + _playlistFilename, _cts.Token).ConfigureAwait(false); 
     var data = await response.Content.ReadAsStringAsync().ConfigureAwait(false); 

     string[] lines = data.Split(new string[] { "\n" }, StringSplitOptions.RemoveEmptyEntries); 
     if (!lines.Any() || lines[0] != "#EXTM3U") 
      throw new Exception("Invalid m3u8 media playlist."); 

     for (var i = 1; i < lines.Length; i++) 
     { 
      var line = lines[i]; 
      if (line.StartsWith("#EXT-X-TARGETDURATION")) 
      { 
       ParseTargetDuration(line); 
      } 
      else if (line.StartsWith("#EXT-X-MEDIA-SEQUENCE")) 
      { 
       ParseMediaSequence(line); 
      } 
      else if (!line.StartsWith("#")) 
      { 
       if (_isNewSegment) 
       { 

        // Fire and forget 
        DownloadTsSegment(line); 

       } 
      } 
     } 

     // Wait until it's time to reload the m3u8 media playlist again 
     await Task.Delay(_targetDuration * 1000, _cts.Token).ConfigureAwait(false); 
    } 
} 

// async void. We never await this method, so we can download multiple segments at once 
private async void DownloadTsSegment(string tsUrl) 
{ 
    var response = await _client.GetAsync(tsUrl, _cts.Token).ConfigureAwait(false); 
    var data = await response.Content.ReadAsByteArrayAsync().ConfigureAwait(false); 

    // Add the downloaded segment data to the AsyncCollection 
    await segmentDataQueue.AddAsync(data, _cts.Token).ConfigureAwait(false); 
} 

private async void RunConsumer() 
{ 
    using (FileStream fs = new FileStream(_filePath, FileMode.Create, FileAccess.Write, FileShare.Read)) 
    { 
     while (!_isCancelled) 
     { 
      // Wait until new segment data is added to the AsyncCollection and write it to disk 
      var data = await segmentDataQueue.TakeAsync(_cts.Token).ConfigureAwait(false); 
      await fs.WriteAsync(data, 0, data.Length).ConfigureAwait(false); 
     } 
    } 
} 
+0

Я не думаю, что есть другой способ, чем то, что вы уже знаете. 1) сохранять сегменты на диск и присоединяться, когда все сегменты закончены. 2) Гарантийный заказ с использованием 'Task.WhenAll' 3) Гарантийный заказ, не используя Fire и Forget, но используйте ожидание для каждой загрузки сегмента. Каждый из трех вариантов имеет свои преимущества, но также и недостатки. Здесь нет волшебной пули, вам нужно будет выбрать решение, которое наиболее точно соответствует тому, с которым вы сможете жить/соответствует требованиям. – Igor

ответ

3

Я не думаю, что вам нужна очередь производителей/потребителей. Однако, я думаю, вы должны избегать «огня и забыть».

Вы можете запустить их все одновременно и просто обработать их по мере их завершения.

Во-первых, определить, как загрузить один сегмент:

private async Task<byte[]> DownloadTsSegmentAsync(string tsUrl) 
{ 
    var response = await _client.GetAsync(tsUrl, _cts.Token).ConfigureAwait(false); 
    return await response.Content.ReadAsByteArrayAsync().ConfigureAwait(false); 
} 

Затем добавить разборе плейлиста, который приводит к списку загрузок сегмента (которые все в процессе уже):

private List<Task<byte[]>> DownloadTasks(string data) 
{ 
    var result = new List<Task<byte[]>>(); 
    string[] lines = data.Split(new string[] { "\n" }, StringSplitOptions.RemoveEmptyEntries); 
    if (!lines.Any() || lines[0] != "#EXTM3U") 
    throw new Exception("Invalid m3u8 media playlist."); 
    ... 
      if (_isNewSegment) 
      { 
      result.Add(DownloadTsSegmentAsync(line)); 
      } 
    ... 
    return result; 
} 

Употребляйте этот список по одному (в порядке) путем записи в файл:

private async Task RunConsumerAsync(List<Task<byte[]>> downloads) 
{ 
    using (FileStream fs = new FileStream(_filePath, FileMode.Create, FileAccess.Write, FileShare.Read)) 
    { 
    for (var task in downloads) 
    { 
     var data = await task.ConfigureAwait(false); 
     await fs.WriteAsync(data, 0, data.Length).ConfigureAwait(false); 
    } 
    } 
} 

И пинать все это с продюсером:

public async Task RunAsync() 
{ 
    // TODO: consider CancellationToken instead of a boolean. 
    while (!_isCancelled) 
    { 
    var response = await _client.GetAsync(_playlistBaseUri + _playlistFilename, _cts.Token).ConfigureAwait(false); 
    var data = await response.Content.ReadAsStringAsync().ConfigureAwait(false); 

    var tasks = DownloadTasks(data); 
    await RunConsumerAsync(tasks); 

    await Task.Delay(_targetDuration * 1000, _cts.Token).ConfigureAwait(false); 
    } 
} 

Обратите внимание, что это решение делает запустить все загрузки одновременно, и это может вызвать давление памяти. Если это проблема, я рекомендую вам реструктурировать использование потока данных TPL, который имеет встроенную поддержку дросселирования.

+0

'memory pressure 'OP указал, что его файлы могут быть размерами GB. Настолько большой, что он даже хочет использовать временные файлы. – usr

+0

Большое спасибо, что работает отлично. Единственная небольшая проблема заключается в том, что Task.Delay в методе RunAsync выполняется слишком поздно. В примере плейлиста m3u8, который я опубликовал выше, продолжительность цели составляет 12 секунд. Список воспроизведения должен быть повторно загружен каждые 12 секунд, независимо от того, завершена ли загрузка сегментов или нет. С текущим кодом он сначала загружает сегменты, а затем ждет 12 секунд. В результате сегменты пропадают из повторно загруженного списка воспроизведения, потому что он был повторно загружен слишком поздно. –

+0

@usr Я на самом деле пытаюсь избежать временных файлов. Я могу получить сотни или даже более 1000 файлов temp, каждый из которых составляет от 0,5 до 3 МБ. Когда поток в реальном времени закончен, все эти временные файлы должны быть соединены, требуя двойного дискового пространства. –

1

Присвоить каждому закачку порядковый номер. Поместите результаты в Dictionary<int, byte[]>. Каждый раз, когда загрузка завершается, он добавляет свой собственный результат.

Затем он проверяет, есть ли сегменты записи на диск:

while (dict.ContainsKey(lowestWrittenSegmentNumber + 1)) { 
WriteSegment(dict[lowestWrittenSegmentNumber + 1]); 
lowestWrittenSegmentNumber++; 
} 

Таким образом, все сегменты в конечном итоге на диске, в порядке и с буферизацией.


RunConsumer(); 
RunProducer(); 

Обязательно используйте async Task, так что вы можете ждать завершения с await Task.WhenAll(RunConsumer(), RunProducer());. Но вам больше не нужно будет RunConsumer.

 Смежные вопросы

  • Нет связанных вопросов^_^