2017-02-14 8 views
2

У меня есть приложение Function в Azure, которое запускается, когда элемент помещается в очередь. Это выглядит примерно так:Ограничение количества параллельных заданий в очереди Azure Functions

public static async Task Run(string myQueueItem, TraceWriter log) 
{ 
    using (var client = new HttpClient()) 
    { 
     client.BaseAddress = new Uri(Config.APIUri); 
     client.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json")); 

     StringContent httpContent = new StringContent(myQueueItem, Encoding.UTF8, "application/json"); 
     HttpResponseMessage response = await client.PostAsync("/api/devices/data", httpContent); 
     response.EnsureSuccessStatusCode(); 

     string json = await response.Content.ReadAsStringAsync(); 
     ApiResponse apiResponse = JsonConvert.DeserializeObject<ApiResponse>(json); 

     log.Info($"Activity data successfully sent to platform in {apiResponse.elapsed}ms. Tracking number: {apiResponse.tracking}"); 
    } 
} 

Все это прекрасно работает и работает очень хорошо. Каждый раз, когда элемент помещается в очередь, мы отправляем данные в некоторый API на нашей стороне и регистрируем ответ. Круто.

Проблема возникает, когда есть большой всплеск в «вещью, которая генерирует сообщения в очереди», и сразу много предметов помещается в очередь. Это, как правило, происходит примерно от 1000 до 1500 штук за минуту. Журнал ошибок будет иметь что-то вроде этого:

2017-02-14T01: 45: 31,692 mscorlib: Исключение при выполнении функции: Functions.SendToLimeade. f-SendToLimeade __- 1078179529: при отправке запроса произошла ошибка . Система: невозможно подключиться к удаленному серверу . Система: разрешено только одно использование адреса каждого сокета (протокол/сетевой адрес/порт) 123.123.123.123:443.

Во-первых, я думал, что это проблема с приложением Azure Function, работающим из локальных сокетов, как illustrated here. Однако затем я заметил IP-адрес. IP-адрес 123.123.123.123 (конечно, измененный для этого примера) - это наш IP-адрес, который отправляет HttpClient. Итак, теперь мне интересно, не наших серверов, заканчивающих работу сокетов для обработки этих запросов.

В любом случае, у нас есть проблема масштабирования, происходящая здесь. Я пытаюсь найти лучший способ решить эту проблему.

Некоторые идеи:

  1. Если это локальное ограничение гнездо, то article above есть пример увеличения локального диапазона портов с помощью Req.ServicePoint.BindIPEndPointDelegate. Это кажется многообещающим, но что вы делаете, когда вы действительно нужно масштабировать? Я не хочу, чтобы эта проблема возвращалась через 2 года.
  2. Если это дистанционное ограничение, похоже, я могу контролировать, сколько сообщений обрабатывает время выполнения функций. Здесь есть интересная статья, в которой говорится, что вы можете установить serviceBus.maxConcurrentCalls в 1, и только одно сообщение будет обработано сразу. Возможно, я мог бы установить это относительно небольшое число. Теперь, в какой-то момент наша очередь будет заполняться быстрее, чем мы можем их обработать, но в этот момент ответ добавляет больше серверов на нашем конце.
  3. Несколько приложений с функциями Azure? Что произойдет, если у меня есть несколько приложений Azure Functions, и все они запускаются в одной очереди? Является ли Azure достаточно умным, чтобы разделить работу среди приложений Function, и я мог бы заставить армию машин обрабатывать мою очередь, которую можно было бы масштабировать или уменьшать по мере необходимости?
  4. Я также встречаюсь с живыми существами. Мне кажется, если бы я мог как-то сохранить мой сокет открытым, когда набирали сообщения о очереди, возможно, это может сильно помочь. Возможно ли это, и какие-нибудь советы о том, как я буду заниматься этим?

Любое понимание рекомендованной (масштабируемой!) Конструкции для такого рода систем было бы очень полезно!

ответ

1

Я думаю, что я нашел решение для этого. Я выполнял эти изменения за прошедшее время 3 часа 6 часов, и у меня были нулевые ошибки сокета. Прежде чем я получаю эти ошибки в больших партиях каждые 30 минут или около того.

Во-первых, я добавил новый класс для управления HttpClient.

public static class Connection 
{ 
    public static HttpClient Client { get; private set; } 

    static Connection() 
    { 
     Client = new HttpClient(); 

     Client.BaseAddress = new Uri(Config.APIUri); 
     Client.DefaultRequestHeaders.Add("Connection", "Keep-Alive"); 
     Client.DefaultRequestHeaders.Add("Keep-Alive", "timeout=600"); 
     Client.DefaultRequestHeaders.Accept.Add(new MediaTypeWithQualityHeaderValue("application/json")); 
    } 
} 

Теперь у нас есть статический экземпляр HttpClient, который мы используем для каждого вызова функции. Из моих исследований настоятельно рекомендуется хранить экземпляры HttpClient как можно дольше, все поточно-безопасные, а HttpClient будет ставить в очередь запросы и оптимизировать запросы к одному и тому же хосту. Заметьте, я также установил заголовки Keep-Alive (я думаю, что это значение по умолчанию, но я решил, что буду неявным).

В моей функции, я просто захватить статический экземпляр HttpClient как:

var client = Connection.Client; 
StringContent httpContent = new StringContent(myQueueItem, Encoding.UTF8, "application/json"); 
HttpResponseMessage response = await client.PostAsync("/api/devices/data", httpContent); 
response.EnsureSuccessStatusCode(); 

Я на самом деле не сделал какой-либо глубокий анализ того, что происходит на уровне сокета (я должен спросить наш ИТ-ребята, если они смогут увидеть этот трафик на балансировщике нагрузки), но я надеюсь, что он просто держит один сокет открытым для нашего сервера и создает кучу HTTP-вызовов, когда обрабатываются элементы очереди. В любом случае, что бы это ни было, похоже, работает. Может быть, у кого-то есть мысли о том, как улучшить.

1

Если вы используете план потребления вместо функций в выделенном веб-приложении, # 3 более или менее происходит из коробки. Функции обнаруживают, что у вас большая очередь сообщений, и будет добавлять экземпляры до тех пор, пока длина очереди не стабилизируется.

maxConcurrentCalls применяется только к каждому экземпляру, что позволяет ограничить параллелизм между экземплярами. В основном, ваша скорость обработки составляет .

Единственный способ контролировать глобальную пропускную способность - использовать функции в выделенных веб-приложениях по размеру, который вы выберете. Каждое приложение будет опросить очередь и захватить работу по мере необходимости.

Лучшее решение для масштабирования улучшит балансировку нагрузки на 123.123.123.123, чтобы он мог обрабатывать любое количество запросов от масштабирования функций вверх/вниз для удовлетворения давления в очереди.

Keep afaik полезен для постоянных соединений, но выполнение функций не рассматривается как постоянное соединение. В будущем мы пытаемся добавить «привязать вашу привязку» к функциям, что позволит вам реализовать пул соединений, если вам понравится.

+0

Прямо сейчас, похоже, это уже с помощью четырех различных узлов (в соответствии с Нью-Relic) для обработки масштабирования. Итак, я думаю, что часть добавления новых приложений функций не требуется; он уже делает это. Я до сих пор неясно, является ли сообщение об ошибке сокета о том, что локальный экземпляр на Azure отсутствует в сокетах или наши серверы находятся вне сокетов. –

+0

Добавил мой собственный ответ ниже; это, кажется, работает для нас до сих пор! –

+0

Этот аналогичный вопрос/ответ может также представлять интерес: https://stackoverflow.com/questions/40094041/throttling-azure-storage-queue-processing-in-azure-function-app – Alex

0

Я знаю, что на вопрос был дан ответ давно, но в то же время Microsoft документировала анти-шаблон, который вы использовали.

Improper Instantiation antipattern

 Смежные вопросы

  • Нет связанных вопросов^_^