Я пытаюсь создать потоковое приложение потока данных, которое считывает события из Pub/Sub и записывает их в BigQuery.Google Pub/Sub для потока данных, избегайте дубликатов с идентификатором записи
Согласно документации, Dataflow может обнаруживать дубликаты доставки сообщений, если используется Индентификационный (см: https://cloud.google.com/dataflow/model/pubsub-io#using-record-ids)
Но даже используя этот идентификатор записи, я до сих пор есть некоторые дубликаты (около 0,0002%).
Я что-то пропустил?
EDIT:
Я использую Spotify Async PubSub Client публиковать сообщения со следующим snipplet:
Message
.builder()
.data(new String(Base64.encodeBase64(json.getBytes())))
.attributes("myid", id, "mytimestamp", timestamp.toString)
.build()
Затем я использую Spotify scio, чтобы прочитать сообщение от паба/подразделам и сохранить его в DataFlow:
val input = sc.withName("ReadFromSubscription")
.pubsubSubscription(subscriptionName, "myid", "mytimestamp")
input
.withName("FixedWindow")
.withFixedWindows(windowSize) // apply windowing logic
.toWindowed // convert to WindowedSCollection
//
.withName("ParseJson")
.map { wv =>
wv.copy(value = TableRow(
"message_id" -> (Json.parse(wv.value) \ "id").as[String],
"message" -> wv.value)
)
}
//
.toSCollection // convert back to normal SCollection
//
.withName("SaveToBigQuery")
.saveAsBigQuery(bigQueryTable(opts), BQ_SCHEMA, WriteDisposition.WRITE_APPEND)
Размер окна составляет 1 минуту.
После нескольких секунд ввода сообщений у меня уже есть дубликаты в BigQuery.
Я использую этот запрос для подсчета дубликатов:
SELECT
COUNT(message_id) AS TOTAL,
COUNT(DISTINCT message_id) AS DISTINCT_TOTAL
FROM my_dataset.my_table
//returning 273666 273564
И этот, чтобы посмотреть на них:
SELECT *
FROM my_dataset.my_table
WHERE message_id IN (
SELECT message_id
FROM my_dataset.my_table
GROUP BY message_id
HAVING COUNT(*) > 1
) ORDER BY message_id
//returning for instance:
row|id | processed_at | processed_at_epoch
1 00166a5c-9143-3b9e-92c6-aab52601b0be 2017-02-02 14:06:50 UTC 1486044410367 { ...json1... }
2 00166a5c-9143-3b9e-92c6-aab52601b0be 2017-02-02 14:06:50 UTC 1486044410368 { ...json1... }
3 00354cc4-4794-3878-8762-f8784187c843 2017-02-02 13:59:33 UTC 1486043973907 { ...json2... }
4 00354cc4-4794-3878-8762-f8784187c843 2017-02-02 13:59:33 UTC 1486043973741 { ...json2... }
5 0047284e-0e89-3d57-b04d-ebe4c673cc1a 2017-02-02 14:09:10 UTC 1486044550489 { ...json3... }
6 0047284e-0e89-3d57-b04d-ebe4c673cc1a 2017-02-02 14:08:52 UTC 1486044532680 { ...json3... }
Можете ли вы рассказать о том, как вы используете идентификаторы записей и измерения дубликатов? Обратите внимание на документацию, в которой «Dataflow не выполняет эту дедупликацию для сообщений с одинаковым значением идентификатора записи, которые публикуются в Pub/Sub более чем за 10 минут». Может ли это быть причиной наблюдаемых дубликатов? –
Я добавил дополнительную информацию :) –