2015-10-13 4 views
-2

Я следующий код, где я пытаюсь вызвать АНИ 10000 раз, но я получаю ошибки:Как запустить 10000 goroutines параллельно, где каждая процедура вызывает api?

package main 

import (
    "fmt" 

    "net/http" 
    "runtime" 
    "sync" 
    "time" 
) 

func main() { 

    nCPU := runtime.NumCPU() 
    runtime.GOMAXPROCS(nCPU) 

    var wg sync.WaitGroup 
    totalRequests := 100000 
    wg.Add(totalRequests) 

    fmt.Println("Starting Go Routines") 

    start := time.Now() 
    total := 0 

    for i := 0; i < totalRequests; i++ { 

     go func(current int) { 
      defer wg.Done() 

      startFunc := time.Now() 
      _, err := http.Get("http://127.0.0.1:8080/event/list") 
      // resp, err := http.Get("https://graph.facebook.com/v2.4/me" + "?fields=id%2Cname&access_token=" + "CAACEdEose0cBAEpQvcsvVMQu5oZCyyDjcEPQi9yCdiXimm4F0AYexGHPZAJHgpyrFOJN5X1VMcicNJjlkaCquUqHMZAfRrtxx6K9cRIROrA0OmbqAqCcg8ZA3qJZCHCl68I1n4LtFb5qxPcotlP5ne5PBakK0OUa7sc6FAOWwByOnFtNZBpIe8XDeM4YFa33sDfftVUpZCoBgZDZD") 

      if err != nil { 
       fmt.Println(err) 
      } 
      // defer resp.Body.Close() 
      elapsedFunc := time.Since(startFunc) 
      fmt.Println("The request (", current, ") took", elapsedFunc, "No of requests completed", total) 
      total++ 

     }(i) 

    } 

    wg.Wait() 
    elapsed := time.Since(start) 
    fmt.Println("\nThe total time with cores", elapsed) 
    fmt.Println("\nTerminating Program") 
} 

Ошибки я получаю:

Get http://127.0.0.1:8080/event/list: dial tcp 127.0.0.1:8080: socket: too many open files 
The request (5390) took 1.619876633s No of requests completed 2781 
Get http://127.0.0.1:8080/event/list: dial tcp 127.0.0.1:8080: socket: too many open files 
The request (7348) took 650.609825ms No of requests completed 1445 
+9

вы пытаетесь открыть слишком много сокетов. Либо ограничивайте одновременные запросы, либо увеличивайте пределы процесса. – JimB

+5

вы также увеличиваете общее количество небезопасным способом, это легко не будет увеличивать количество прогонов. Используйте atomic.AddInt64 (& total) (и измените на total: = int64 (0)) –

+1

@evanmcdonnal: эта ошибка вызвана исчерпанием файловых дескрипторов. Не имеет никакого отношения к удаленному хосту. – JimB

ответ

3

Как другие, упомянутые в комментариях , ваша основная проблема заключается в том, что вы превысили ограничение открытого файла процесса.

Вы можете легко реализовать семафор с помощью каналов, чтобы ограничить параллелизм:

totalRequests := 100000 
concurrency := 1024 
sem := make(chan bool, concurrency) 

start := time.Now() 
total := int32(0) 

for i := 0; i < totalRequests; i++ { 
    sem <- true 

    go func(current int) { 
     startTime := time.Now() 

     // Make request here 

     elapsedTime := time.Since(startTime) 
     atomic.AddInt32(&total, 1) 
     fmt.Printf("Request %d took %s. Requests completed: %d\n", current, elapsedTime, atomic.LoadInt32(&total)) 

     <-sem 
    }(i) 
} 

for i := 0; i < cap(sem); i++ { 
    sem <- true 
} 
elapsedTotal := time.Since(start) 
fmt.Printf("\nTotal time elapsed: %s\n", elapsedTotal) 

Это позволит ограничить количество параллельных запросов на то, что указано в concurrency.

Как вы можете видеть, переменная total увеличивается с помощью atomic пакета, так как мы изменяем эту переменную из потенциально параллельных goroutines, которые могли бы произвели неверную сумму при модификации ненадежно, как вы это делали.

Смотрите этот блог для исходного примера & объяснения ограничения параллелизма в Go: http://jmoiron.net/blog/limiting-concurrency-in-go

EDIT:

Как отметил JimB ниже, еще один общий подход заключается в concurrency числе goroutines делает в то время как мы кормем их им. Вот общая do функция, которая одна может использовать для этого:

func do(total, concurrency int, fn func(int)) { 
    workQueue := make(chan int, concurrency) 

    var wg sync.WaitGroup 
    wg.Add(concurrency) 

    for i := 0; i < concurrency; i++ { 
     go func() { 
      for i := range workQueue { 
       fn(i) 
      } 
      wg.Done() 
     }() 
    } 
    for i := 0; i < total; i++ { 
     workQueue <- i 
    } 
    close(workQueue) 
    wg.Wait() 
} 

Мы нерест concurrency goroutines, а затем начать посылать значения в workQueue канал, пока total не посылается. Закрывая канал workQueue, мы фактически заканчиваем циклы диапазонов в наших гортанах. После этого мы просто подождем, пока все оставшиеся горуты не закончатся.

Для случая использования в вопросе, он может быть использован, как это:

totalRequests := 1000000 
concurrency := 1024 

do(totalRequests, concurrency, func(i int) { 
    // Make request here 

    fmt.Printf("Request %d done.\n", i) 
}) 
+1

Другим более распространенным шаблоном является наличие «параллелизма» числа рабочих городов, расположенных по каналу, вместо того, чтобы каждый раз генерировать новую горутинку. – JimB

+0

@JimB Вправо добавится дополнительный пример. –