2015-09-07 6 views
4

У нас есть процесс, посредством которого пользователи запрашивают файлы, которые нам нужно получить из нашего источника. Этот источник не самый надежный, поэтому мы внедрили очередь с использованием Amazon SQS. Мы помещаем URL-адрес загрузки в очередь, а затем мы опросили его с небольшим приложением, которое мы написали в Go. Это приложение просто извлекает сообщения, загружает файл и затем выталкивает его на S3, где мы его храним. Как только все это будет завершено, он вызовет службу, которая отправит электронное сообщение пользователю, чтобы сообщить им, что файл готов.Как выполнять параллельные загрузки в Go

Первоначально я написал это, чтобы создать каналы n, а затем подключил 1 рутину к каждому и имел рутину в бесконечном цикле. Таким образом, я мог убедиться, что я только обрабатывал фиксированное количество загрузок за раз.

Я понял, что это не так, как предполагается, что каналы используются, и, если я сейчас правильно понимаю, на самом деле должен быть один канал с n go-routines, получающий на этом канале. Каждый ход-подпрограмма находится в бесконечном цикле, ожидая сообщения, и когда он получит его, он обработает данные, сделает все, что он должен, и когда это будет сделано, он будет ждать следующего сообщения. Это позволяет мне гарантировать, что я только обрабатываю n файлов за раз. Я думаю, что это правильный способ сделать это. Я считаю, что это fan-out, правильно?

Что мне нужно Не нужно делать, чтобы объединить эти процессы вместе. Как только загрузка будет выполнена, он перезвонит удаленную службу, чтобы обрабатывать оставшуюся часть процесса. Нет ничего, что нужно сделать приложению.

ОК, так что некоторый код:

func main() { 
    queue, err := ConnectToQueue() // This works fine... 
    if err != nil { 
     log.Fatalf("Could not connect to queue: %s\n", err) 
    } 

    msgChannel := make(chan sqs.Message, 10) 

    for i := 0; i < MAX_CONCURRENT_ROUTINES; i++ { 
     go processMessage(msgChannel, queue) 
    } 

    for { 
     response, _ := queue.ReceiveMessage(MAX_SQS_MESSAGES) 

     for _, m := range response.Messages { 
      msgChannel <- m 
     } 
    } 
} 

func processMessage(ch <-chan sqs.Message, queue *sqs.Queue) { 
    for { 
     m := <-ch 
     // Do something with message m 

     // Delete message from queue when we're done 
     queue.DeleteMessage(&m) 
    } 
} 

ли я где-нибудь близко здесь? У меня есть n running go-routines (где MAX_CONCURRENT_ROUTINES = n), и в цикле мы будем передавать сообщения в один канал. Правильно ли это? Мне нужно закрыть что-нибудь или я могу просто оставить это бег на неопределенный срок?

Одна вещь, которую я замечаю, заключается в том, что SQS возвращает сообщения, но как только у меня было 10 сообщений, переданных в processMessage() (10 - размер буфера канала), что никакие другие сообщения фактически не обрабатываются.

Все

ответ

3

Это выглядит хорошо. Несколько примечаний:

  1. Вы можете ограничить параллелизм работой другими способами, кроме ограничения числа рабочих процедур, которые вы запускаете. Например, вы можете создать goroutine для каждого полученного сообщения, а затем, чтобы порожденный goroutine ждал семафора, который ограничивает параллелизм. Конечно, есть компромиссы, но вы не ограничены только тем, как вы описали.

    sem := make(chan struct{}, n) 
    work := func(m sqs.Message) { 
        sem <- struct{}{} // When there's room we can proceed 
        // do the work 
        <-sem // Free room in the channel 
    }() 
    for _, m := range queue.ReceiveMessage(MAX_SQS_MESSAGES) { 
        for _, m0 := range m { 
         go work(m0) 
        } 
    } 
    
  2. Предел только 10 сообщений, обрабатываемых причиняется в другом месте в стеке. Возможно, вы видите гонку, где первые 10 заполняют канал, а затем работа не завершается, или, может быть, вы случайно возвращаетесь из рабочих процедур. Если ваши сотрудники настойчиво относятся к описанной вами модели, вам нужно быть уверенным, что они не вернутся.

  3. Непонятно, хотите ли вы вернуть процесс после обработки некоторого количества сообщений. Если вы хотите, чтобы этот процесс завершился, вам нужно подождать, пока все работники закончат свои текущие задачи, и, возможно, предупредит их о возвращении. Взгляните на sync.WaitGroup для синхронизации их завершения и с другим каналом, чтобы сигнализировать, что больше нет работы или закрыть msgChannel, и обрабатывайте это в своих рабочих. (Взгляните на выражение приема обратного канала с 2-мя корнями.)

+0

Спасибо @ matt-joiner, конечно, я вернулся ... Раньше у меня была одна процедура для каждого сообщения, и они возвращались, когда сделанный. Когда я переместил его только на 10, я забыл изменить 'return' на' continue'. –

+0

ОК, я закончил обработку вашего ответа, теперь я исправил проблему 'return' /' continue'. Спасибо за предложение о семафорах; Я буду читать по этому вопросу. Мне не нужны эти рабочие, нет. Они должны просто обработать сообщение, содержащее URI обратного вызова, поэтому они просто вызывают это, а затем ждут следующего сообщения. Однако оцените указатели на 'sync.WaitGroup' и 2-кортеж возвратного канала. В очередной раз благодарим за помощь –