2016-04-05 2 views
0

Я работаю над приложением, которое считывает и обрабатывает события из потока AWS Kinesis Stream с помощью клиентской библиотеки Kinesis (KCL). Я не хочу, чтобы сторона, производящая события, страдала латентностью, поэтому KinesisAsyncClient использовался для отправки событий. Однако для того, чтобы моя обработка событий работала правильно, мне нужно обработать evens в «порядке, который я назвал putRecordAsync» на моей стороне производителя. Эта информация доступна как поле метки времени внутри каждой записи Kinesis.Как изменить порядок асинхронно отправленных событий Kinesis внутри KCL

Помимо переключения на использование синхронного клиента Kinesis с блокировкой, существует ли другое решение, позволяющее эффективно сортировать потоковые события?

+0

Читатель будут получать события в том порядке, в котором они находятся в потоке. Вы можете контролировать порядок событий со стороны производителя. Вы можете использовать putRecords (обратите внимание на s), который упорядочивает несколько событий. Вы также можете использовать seq-id предыдущего события, чтобы поместить следующее событие за ним. Это имеет смысл для вас? – Guy

+0

@Guy Does putRecordsAync с помощью Async Client от Kinesis также гарантирует порядок событий? – SuSanD

+0

Призыв API к Kinesis с помощью PutRecords (синхронизация или асинхронный) сохраняет порядок событий в полезной нагрузке вызова. Если вы делаете в клиентском коде дополнительную партию для создания другого вызова API, эти события могут быть не в порядке вы вызвали асинхронный вызов в нескольких вызовах. – Guy

ответ

0

При заказе важно, не использовать асинхронный клиент.

Асинхронный клиент просто использует пул потоков под обложками для совершения одинаковых вызовов - поскольку он многопоточен, вы не можете гарантировать порядок выполнения этих потоков, и в результате у вас нет контроля над порядком тех записи получены Кинезисом.

Теперь, если задержка действительно является проблемой для производителя:

  1. Убедитесь, что вы звоните PutRecords (вместо PutRecord), где это возможно - это, безусловно, сэкономит вам несколько сетевых круглых поездок.

  2. Вместо прямого вызова клиента просто поместите записи в порядке очереди в локальную очередь и используйте фоновый поток для регулярного опроса из этой очереди для вызова PutRecords.

Некоторые другие вещи, чтобы иметь в виду - если это не достаточно быстро, чтобы сохранить вашу очередь в процессе близко опорожнить, что указывает на наличие достаточно большой пропускной способности, что вам нужно несколько потоков ввода данных, и у вас больше нет точного заказа. Если это так, я настоятельно рекомендую предоставить порядковые номера с вашими записями, чтобы вы могли переупорядочить их, если это необходимо на стороне потребителя (также учитывайте SQS как отправную точку вместо Kinesis в этом случае)