2017-01-22 9 views
0

Я хочу понять, когда метод processRecords от IRecordProcessor вызывается из рабочего. Если мой предыдущий вызов processRecords еще не завершен, рабочий вызовет следующий processRecords? Будет ли рабочий запускать новые записи из кинезиса или он будет ждать окончания выполнения текущих записей.kinesis client worker logic

В принципе, я хочу долго ждать, если processRecords получает какое-то исключение, сохраняя записи во внешнем db, поскольку db не работает или какая-либо другая ошибка. Так что хотите подтвердить, что не будет никаких проблем в том случае, если рабочий не начнет получать новые записи до тех пор, пока ранее не завершит обработку?

ответ

0

Отрывок из других вопросов:

приложения (с помощью KCL) будет продолжать опрашивать «осколок итератора» в фоновом режиме, таким образом, вы будете получать уведомления о новых данных , когда речь идет о ,

Источник: https://stackoverflow.com/a/35582161/1622134

А также, по "рабочему" вы имеете в виду "Рабочий" потока в приложении; который является управляемым.

Каждый осколок обрабатывается ровно один KCL работника и имеет ровно один соответствующая запись процессора, поэтому вам не нужно несколько экземпляров обрабатывать один осколок. См. Класс Worker.java в источнике KCL.

Источник: https://stackoverflow.com/a/34509567/1622134

ответить вам вопрос, вы можете, что в вашем processRecords реализации. Во время обработки записей используйте блок try-catch и записывайте контрольную точку в DynamoDB тогда и только тогда, когда часть попытки выполнена успешно. Сюда; если во время записи на внешний db есть ошибка, вы не потеряете записи и после перезагрузки. Вы также должны сохранить эти данные записи (которые не могут быть вставлены в db) в другое место для последующей обработки.

Также см этот ответ: https://stackoverflow.com/a/32517002/1622134

+0

В worker.java, он вызывает runProcessLoop и в том, что он вызывает shardConsumer.consumeShard() там называет checkAndSubmitNextTask() в том, что он проверяет readyForNextTask или нет. Если notReady, он не использует новые записи. Итак, как это возможно, рабочий получает новые записи без предыдущего процесса обработки документов. – user1846749

+0

Если на вашей стороне есть временное отключение db (что предотвращает использование записей потребления); вы должны остановить свое потребительское приложение Kinesis до тех пор, пока оно не будет исправлено. Или есть второй подход: в заключительной ссылке моего ответа есть строка, объясняющая ваш вопрос: «Но если это не удается, обратите внимание на другое место, чтобы исследовать причину, по которой это не удалось». - Таким образом, вы можете обрабатывать записи, потребляемые во время отключения db позже, вручную. – az3

 Смежные вопросы

  • Нет связанных вопросов^_^