EDIT: Может быть, я неправильно понял ваш вопрос, я вижу, что вы говорите, что главный начнет много производителей goroutines. Я думал, что было много потребителя goroutines и одного производителя. Оставляя ответ здесь, если он может быть полезен для других, которые ищут этот шаблон, хотя пункты маркера все еще применяются к вашему делу.
Так что, если я правильно понимаю ваш прецедент, вы не можете ожидать отправки по каналу и сразу же прочитать результаты. Вы не знаете, когда рабочий обработает эту отправку, вам необходимо установить связь между goroutines, и это делается с помощью каналов. Предполагая, что просто вызов функции с возвращаемым значением не работает в вашем сценарии, если вам действительно нужно отправить работнику, затем заблокируйте, пока не получите результат, вы можете отправить канал как часть структуры данных, получить на него после отправки, то есть:
resCh := make(chan Result)
ch <- Data{key, value, resCh}
res := <- resCh
Но вы, вероятно, должны попытаться сломать работу в качестве трубопровода самостоятельных шагов вместо этого, увидеть сообщение в блоге, что я связан в первоначальном ответе.
Оригинальный ответ, где я думал, что это был один производитель - несколько потребителей/работников картина:
Это общая картина, для которой Гоу goroutines и каналы семантика очень хорошо подходит. Есть несколько вещей, которые вам нужно иметь в виду:
Основная функция не будет автоматически ждать завершения горутин. Если больше нечего делать в основном, программа выходит и у вас нет ваших результатов.
Глобальная карта, используемая вами не для потоковой передачи. Вам нужно синхронизировать доступ с помощью мьютекса, но есть лучший способ - использовать выходной канал для результатов, который уже синхронизирован.
Вы можете использовать канал ... по каналу, и вы можете безопасно обмениваться каналом между несколькими goroutines. Как мы увидим, эта модель довольно элегантна для написания.
площадка: https://play.golang.org/p/WqyZfwldqp
Более подробную информацию о Go трубопроводов и моделей параллелизма, ввести обработку ошибок, раннее списание и т.д.: https://blog.golang.org/pipelines
Комментарии код потребительной случае вы упоминаете:
// could be a command-line flag, a config, etc.
const numGoros = 10
// Data is a similar data structure to the one mentioned in the question.
type Data struct {
key string
value int
}
func main() {
var wg sync.WaitGroup
// create the input channel that sends work to the goroutines
inch := make(chan Data)
// create the output channel that sends results back to the main function
outch := make(chan Data)
// the WaitGroup keeps track of pending goroutines, you can add numGoros
// right away if you know how many will be started, otherwise do .Add(1)
// each time before starting a worker goroutine.
wg.Add(numGoros)
for i := 0; i < numGoros; i++ {
// because it uses a closure, it could've used inch and outch automaticaly,
// but if the func gets bigger you may want to extract it to a named function,
// and I wanted to show the directed channel types: within that function, you
// can only receive from inch, and only send (and close) to outch.
//
// It also receives the index i, just for fun so it can set the goroutines'
// index as key in the results, to show that it was processed by different
// goroutines. Also, big gotcha: do not capture a for-loop iteration variable
// in a closure, pass it as argument, otherwise it very likely won't do what
// you expect.
go func(i int, inch <-chan Data, outch chan<- Data) {
// make sure WaitGroup.Done is called on exit, so Wait unblocks
// eventually.
defer wg.Done()
// range over a channel gets the next value to process, safe to share
// concurrently between all goroutines. It exits the for loop once
// the channel is closed and drained, so wg.Done will be called once
// ch is closed.
for data := range inch {
// process the data...
time.Sleep(10 * time.Millisecond)
outch <- Data{strconv.Itoa(i), data.value}
}
}(i, inch, outch)
}
// start the goroutine that prints the results, use a separate WaitGroup to track
// it (could also have used a "done" channel but the for-loop would be more complex, with a select).
var wgResults sync.WaitGroup
wgResults.Add(1)
go func(ch <-chan Data) {
defer wgResults.Done()
// to prove it processed everything, keep a counter and print it on exit
var n int
for data := range ch {
fmt.Println(data.key, data.value)
n++
}
// for fun, try commenting out the wgResults.Wait() call at the end, the output
// will likely miss this line.
fmt.Println(">>> Processed: ", n)
}(outch)
// send work, wherever that comes from...
for i := 0; i < 1000; i++ {
inch <- Data{"main", i}
}
// when there's no more work to send, close the inch, so the goroutines will begin
// draining it and exit once all values have been processed.
close(inch)
// wait for all goroutines to exit
wg.Wait()
// at this point, no more results will be written to outch, close it to signal
// to the results goroutine that it can terminate.
close(outch)
// and wait for the results goroutine to actually exit, otherwise the program would
// possibly terminate without printing the last few values.
wgResults.Wait()
}
В реальных сценариях, где объем работ не известен заранее, закрытие в-канала может прийти от, например сигнал SIGINT. Просто убедитесь, что ни один кодовый путь не может отправлять работу после закрытия канала, поскольку это будет паниковать.
Чтение карты должно быть синхронизировано с письмами goroutines. Попробуйте добавить 'ch <- data' в качестве другого случая в предложении select. – Nadh