2017-01-18 7 views
4

У меня есть прецедент, где у меня есть функция AWS Step, которая запускается, когда файл загружается на S3, откуда первый шаг запускает ffprobe, чтобы получить длительность файла из внешняя служба, такая как transloadit, где выход записывается обратно на S3.AWS Step Function - Дождитесь события

Я могу создать новую функцию шага из этого события, но я блуждал, если в исходной функции шага можно выполнить обещание Await, а затем перейти к следующему - учитывая, что это может занять больше времени для ffprobe для возвращения.

Любые советы очень ценятся, как справиться с этим.

ответ

1

Невозможно предложить простое решение, только несколько направлений для изучения.

Во-первых, функции Step имеют особый способ обработки долгого рабочего фона: действия. https://docs.aws.amazon.com/step-functions/latest/dg/concepts-activities.html Это в основном очередь.

Если вы хотите 100% -ный сервер без сервера, это будет сложно или уродливо.

  • либо, как вы сказали, создать новую ступенчатую функцию для каждого файла
  • или, S3 цикла опроса в государственной машине, используя пользовательский код ошибки и Retry пункт

Если вы можете выделить «1/8 micro "для фонового работника это не изящно, но легко и может быть реализовано с мгновенной реакцией. Низкое требование к оборудованию подсказывает, что мы будем использовать машину только для синхронизации.

Определить деятельность StepFunction, названную, например, video-duration. Определите очередь SQS для мгновенной реакции или опроса S3 для результатов продолжительности.

Государственная функция псевдокод:

{ 
    StartAt: ffprobe 
    ffprobe: { 
    Type: Task 
    Resource: arn:...lambda:launch-ffprobe 
    Next: wait-duration 
    } 
    wait-duration: { 
    Type: Task 
    Resource: arn...activity:video-duration 
    End: true 
    } 
} 

Фоновая работник псевдокод:

statemap = dict/map filename to result 

thread1: 
    loop: 
    taskToken, input = SF.GetActivityTask('video-duration') # long poll 
    sync(key=input.filename, waiter=taskToken) 
thread2: 
    loop: 
    msg = SQS.ReceiveMessage(...) # or poll S3 
    sync(key=msg.filename, duration=msg.result) 

function sync(key, waiter, duration): 
    state = statemap[key] 
    if waiter: 
    state.waiter = waiter 
    if duration: 
    state.duration = duration 
    if state.waiter and state.duration: 
    SF.SendTaskSuccess(state.waiter, state.duration) 

S3 триггер псевдокод:

if filename is video: 
    SF.StartExecution(...) 
else if filename is duration: 
    content = S3.GetObject(filename) 
    SQS.SendMessage(queue, content) 
0

Ну, я бы вдохновить себя от https://aws.amazon.com/blogs/compute/implementing-serverless-manual-approval-steps-in-aws-step-functions-and-amazon-api-gateway/

Вы можете заменить шлюз API в этом с помощью функции AWS Lambda, вызванной, например, событием S3 (Документация: http://docs.aws.amazon.com/lambda/latest/dg/with-s3.html). Просто убедитесь, что ваша задача имеет соответствующий тайм-аут.

0

Когда вы отправляете запрос на перегрузку, сохраните taskToken для шага в s3 с помощью предсказуемого ключа на основе загруженного файла. Например, если файл мультимедиа находится в 's3: //my-media-bucket/foobar/media-001.mp3', вы можете сделать файл JSON, содержащий маркер задачи текущего шага, и сохранить его с тем же ключом в другом ковше, например 's3: //ffprobe-tasks/foobar/media-001.mp3.json'. В конце вашего шага, который отправляет носитель на transloadit , не следует выполнить успешный или неудачный вызов на шаге - оставьте его в рабочем состоянии.

Затем, когда вы получите уведомление s3 о том, что результат transloadit готов, вы можете определить ключ s3, чтобы получить токен задачи ('s3: // ffprobe-tasks/foobar/media-001.mp3 '), загрузите JSON (и удалите его из s3) и отправьте успех для этой задачи. Функция шага будет продолжать следующее состояние в выполнении.