У нас есть процесс, посредством которого пользователи запрашивают файлы, которые нам нужно получить из нашего источника. Этот источник не самый надежный, поэтому мы внедрили очередь с использованием Amazon SQS. Мы помещаем URL-адрес загрузки в очередь, а затем мы опросили его с небольшим приложением, которое мы написали в Go. Это приложение просто извлекает сообщения, загружает файл и затем выталкивает его на S3, где мы его храним. Как только все это будет завершено, он вызовет службу, которая отправит электронное сообщение пользователю, чтобы сообщить им, что файл готов.Как выполнять параллельные загрузки в Go
Первоначально я написал это, чтобы создать каналы n, а затем подключил 1 рутину к каждому и имел рутину в бесконечном цикле. Таким образом, я мог убедиться, что я только обрабатывал фиксированное количество загрузок за раз.
Я понял, что это не так, как предполагается, что каналы используются, и, если я сейчас правильно понимаю, на самом деле должен быть один канал с n go-routines, получающий на этом канале. Каждый ход-подпрограмма находится в бесконечном цикле, ожидая сообщения, и когда он получит его, он обработает данные, сделает все, что он должен, и когда это будет сделано, он будет ждать следующего сообщения. Это позволяет мне гарантировать, что я только обрабатываю n файлов за раз. Я думаю, что это правильный способ сделать это. Я считаю, что это fan-out, правильно?
Что мне нужно Не нужно делать, чтобы объединить эти процессы вместе. Как только загрузка будет выполнена, он перезвонит удаленную службу, чтобы обрабатывать оставшуюся часть процесса. Нет ничего, что нужно сделать приложению.
ОК, так что некоторый код:
func main() {
queue, err := ConnectToQueue() // This works fine...
if err != nil {
log.Fatalf("Could not connect to queue: %s\n", err)
}
msgChannel := make(chan sqs.Message, 10)
for i := 0; i < MAX_CONCURRENT_ROUTINES; i++ {
go processMessage(msgChannel, queue)
}
for {
response, _ := queue.ReceiveMessage(MAX_SQS_MESSAGES)
for _, m := range response.Messages {
msgChannel <- m
}
}
}
func processMessage(ch <-chan sqs.Message, queue *sqs.Queue) {
for {
m := <-ch
// Do something with message m
// Delete message from queue when we're done
queue.DeleteMessage(&m)
}
}
ли я где-нибудь близко здесь? У меня есть n running go-routines (где MAX_CONCURRENT_ROUTINES
= n), и в цикле мы будем передавать сообщения в один канал. Правильно ли это? Мне нужно закрыть что-нибудь или я могу просто оставить это бег на неопределенный срок?
Одна вещь, которую я замечаю, заключается в том, что SQS возвращает сообщения, но как только у меня было 10 сообщений, переданных в processMessage()
(10 - размер буфера канала), что никакие другие сообщения фактически не обрабатываются.
Все
Спасибо @ matt-joiner, конечно, я вернулся ... Раньше у меня была одна процедура для каждого сообщения, и они возвращались, когда сделанный. Когда я переместил его только на 10, я забыл изменить 'return' на' continue'. –
ОК, я закончил обработку вашего ответа, теперь я исправил проблему 'return' /' continue'. Спасибо за предложение о семафорах; Я буду читать по этому вопросу. Мне не нужны эти рабочие, нет. Они должны просто обработать сообщение, содержащее URI обратного вызова, поэтому они просто вызывают это, а затем ждут следующего сообщения. Однако оцените указатели на 'sync.WaitGroup' и 2-кортеж возвратного канала. В очередной раз благодарим за помощь –