2017-02-01 8 views
2

Каждый день у меня будет выполнение задачи CRON, которая заполняет очередь SQS множеством задач, которые должны быть достигнуты. Так (например) в 9 утра каждое утро, а пустая очередь получит ~ 100 сообщений, которые нужно будет обработать.Ограничение скорости Работник для очереди (например: SQS)

Я хотел бы, чтобы новый работник вращался каждую секунду, пока очередь не пуста. Если какая-либо задача выходит из строя, она помещается в обратную сторону очереди для повторного запуска.

Например, если каждая задача занимает до 1,5 секунд, чтобы закончить: второй

  • после того, как 1, 1 работник начал ообщение
  • через 2 секунды, 1 работник может все еще быть запущен ообщение и 1 работник начнет выполнение сообщения B
  • через 100 секунд, 1 рабочий может все еще работать с сообщением XX, а 1 работник будет получать сообщение B, потому что он не прошел предыдущий
  • через 101 секунду больше рабочих не распространялось до тех пор, пока следующим утром

Есть ли какой-либо способ настроить этот тип инфраструктуры в рамках AWS лямбда?

+0

Это интересный прецедент. Можете ли вы дать нам представление о том, почему необходимо ограничение 1-й скорости (даже в широком смысле)? Это может быть выполнено с помощью Executor, который генерирует 1 поток в секунду и обрабатывает ровно 1 опрос SQS, а затем работает неудавшаяся очередь, если не пустая, - но мне все еще интересно, где это было бы желательно. Благодаря! –

+0

Мы используем его для связи с сторонним API, который ограничивает использование нами своих услуг максимум одним запросом в секунду. – bashaus

ответ

1

Кажется, что вам лучше опубликовать ваши сообщения в SNS вместо SQS, а затем ваши лямбда-функции будут подписаны на тему SNS.

Позвольте Лямбде беспокоиться о том, сколько «экземпляров» нужно раскрутить в ответ на нагрузку.

Это одно сообщение блога об этом методе, но Google может помочь вам найти тот, который ближе к вашему фактическому прецеденту.

https://aws.amazon.com/blogs/mobile/invoking-aws-lambda-functions-via-amazon-sns/

+0

Вызов - это не проблема для меня в данный момент, это ограничение скорости, это проблема. Любые предложения по этому аспекту? – bashaus

+0

Можете ли вы объяснить, почему вам нужно ограничить скорость? Этот ответ может привести к лучшему ответу. –

+0

В основном хотите иметь возможность отправлять сообщения API, который имеет необоснованный предел скорости – bashaus

1

Один из способов, хотя я не уверен, что это оптимальный:

Лямбда, который запускается с помощью CloudWatch события (скажем, каждый второй, или через каждые 10 секунд, в зависимости от вашего лимита скорости). Какой опрос SQS для приема (не более) N сообщений, он затем «отгораживается» от другой функции лямбда с каждым сообщением.


Некоторые псевдо-код:

# Lambda 1 (schedule by CloudWatch Event/e.g. CRON) 
def handle_cron(event, context): 
    # in order to get more messages, we might have to receive several times (loop) 
    for message in queue.receive_messages(MaxNumberOfMessages=10): 
     # Note: the Event InvocationType so we don't want to wait for the response! 
     lambda_client.invoke(FunctionName="foo", Payload=message.body, InvocationType='Event') 

и

# Lambda 2 (triggered only by the invoke in Lambda 1) 
def handle_message(event, context): 
    # handle message 
    pass 
+0

+1. Мне действительно нравится этот дизайн. Это намного лучше, чем настройка видимости каждого сообщения, когда вы достигаете предела API. – sfratini

0

Почему не только имеют функцию лямбда, которая начинается опроса SQS в 9 утра, получая одно сообщение в то время, и спать для второе между каждым сообщением? Очереди мертвой буквы могут обрабатывать повторы. Прекратите выполнение, не получив сообщение от SQS через x секунд.

Это уникальный случай, когда вы фактически не нуждаетесь в параллельной обработке.

+0

Думаю, у вас также могут быть события N Cloudwatch с одним и тем же CRON ... чтобы ограничить N секунды, когда вы говорите «Остановить выполнение», вы имеете в виду отключить событие (ы)? –

+0

То, что я имею в виду, имеет функцию лямбда, которая опросает очередь SQS. После получения сообщения он должен обработать его, а затем спать на секунду. Затем выполните опрос для другого сообщения. Если вы опросили сообщения и не получили каких-либо за х секунд, прекратите выполнение лямбда-функции до следующего cron. – Mark