2

Я пытаюсь создать потоковое приложение потока данных, которое считывает события из Pub/Sub и записывает их в BigQuery.Google Pub/Sub для потока данных, избегайте дубликатов с идентификатором записи

Согласно документации, Dataflow может обнаруживать дубликаты доставки сообщений, если используется Индентификационный (см: https://cloud.google.com/dataflow/model/pubsub-io#using-record-ids)

Но даже используя этот идентификатор записи, я до сих пор есть некоторые дубликаты (около 0,0002%).

Я что-то пропустил?

EDIT:

Я использую Spotify Async PubSub Client публиковать сообщения со следующим snipplet:

Message 
     .builder() 
     .data(new String(Base64.encodeBase64(json.getBytes()))) 
     .attributes("myid", id, "mytimestamp", timestamp.toString) 
     .build() 

Затем я использую Spotify scio, чтобы прочитать сообщение от паба/подразделам и сохранить его в DataFlow:

val input = sc.withName("ReadFromSubscription") 
       .pubsubSubscription(subscriptionName, "myid", "mytimestamp") 
input 
    .withName("FixedWindow") 
    .withFixedWindows(windowSize) // apply windowing logic 
    .toWindowed // convert to WindowedSCollection 
    // 
    .withName("ParseJson") 
    .map { wv => 
     wv.copy(value = TableRow(
     "message_id" -> (Json.parse(wv.value) \ "id").as[String], 
     "message" -> wv.value) 
    ) 
    } 
    // 
    .toSCollection // convert back to normal SCollection 
    // 
    .withName("SaveToBigQuery") 
    .saveAsBigQuery(bigQueryTable(opts), BQ_SCHEMA, WriteDisposition.WRITE_APPEND) 

Размер окна составляет 1 минуту.

После нескольких секунд ввода сообщений у меня уже есть дубликаты в BigQuery.

Я использую этот запрос для подсчета дубликатов:

SELECT 
    COUNT(message_id) AS TOTAL, 
    COUNT(DISTINCT message_id) AS DISTINCT_TOTAL 
FROM my_dataset.my_table 

//returning 273666 273564 

И этот, чтобы посмотреть на них:

SELECT * 
FROM my_dataset.my_table 
WHERE message_id IN (
    SELECT message_id 
    FROM my_dataset.my_table 
    GROUP BY message_id 
    HAVING COUNT(*) > 1 
) ORDER BY message_id 

//returning for instance: 
row|id         | processed_at   | processed_at_epoch  
1 00166a5c-9143-3b9e-92c6-aab52601b0be 2017-02-02 14:06:50 UTC 1486044410367 { ...json1... } 
2 00166a5c-9143-3b9e-92c6-aab52601b0be 2017-02-02 14:06:50 UTC 1486044410368 { ...json1... } 
3 00354cc4-4794-3878-8762-f8784187c843 2017-02-02 13:59:33 UTC 1486043973907 { ...json2... } 
4 00354cc4-4794-3878-8762-f8784187c843 2017-02-02 13:59:33 UTC 1486043973741 { ...json2... } 
5 0047284e-0e89-3d57-b04d-ebe4c673cc1a 2017-02-02 14:09:10 UTC 1486044550489 { ...json3... } 
6 0047284e-0e89-3d57-b04d-ebe4c673cc1a 2017-02-02 14:08:52 UTC 1486044532680 { ...json3... } 
+0

Можете ли вы рассказать о том, как вы используете идентификаторы записей и измерения дубликатов? Обратите внимание на документацию, в которой «Dataflow не выполняет эту дедупликацию для сообщений с одинаковым значением идентификатора записи, которые публикуются в Pub/Sub более чем за 10 минут». Может ли это быть причиной наблюдаемых дубликатов? –

+0

Я добавил дополнительную информацию :) –

ответ

1

BigQuery documentation states, что могут быть редкие случаи, когда дублирует прибывающим:

  1. «BigQuery запоминает этот идентификатор в течение как минимум одной минуты» - если поток данных занимает более одной минуты, прежде чем повторять попытку Вставка BigQuery может разрешить дублирование. Вы можете посмотреть журналы из конвейера, чтобы определить, так ли это.
  2. «В редких случаях, когда центр обработки данных Google неожиданно потерял связь, автоматическая дедупликация может оказаться невозможной».

Возможно, вы хотите исправить ошибки, связанные с manually removing duplicates. Это также позволит вам увидеть insertID, который использовался с каждой строкой, чтобы определить, была ли проблема на стороне потока данных (генерирование разных insertID s для той же записи) или на стороне BigQuery (не удалось дедуплицировать строки на основе их insertID) ,