12

Описание

У нас есть искра Streaming 1.5.2 приложения в Scala, который читает JSON событие из Kinesis потока, делают некоторые преобразования/агрегирование и записывают результаты разные префиксы S3. Текущий интервал между партиями составляет 60 секунд. У нас 3000-7000 событий/сек. Мы используем контрольную точку, чтобы защитить нас от потери агрегатов.вопросов надежности с чекпойнтингом/WAL искрового Streaming 1.6.0

Это хорошо работает некоторое время, восстанавливаясь из-за исключений и даже перезапуска кластера. Недавно мы перекомпилировали код Spark Streaming 1.6.0, изменив только зависимости библиотеки в файле build.sbt. После запуска кода в кластере Спарк 1.6.0 в течение нескольких часов, мы заметили следующее:

  1. «Input Rate» и «Время обработки» волатильность существенно возросла (см скриншоты ниже) в 1,6 раза. 0.
  2. Каждые несколько часов в «WriteAheadLog» вызывается «Исключение, созданное при записи записи: BlockAdditionEvent ...». java.util.concurrent.TimeoutException: фьючерсы, истекающие после исключения [5000 миллисекунд] "(см. полную трассировку стека ниже), совпадающие с падением до 0 событий/сек для определенных партий (минут).

После некоторого рытья, я думаю, что вторая проблема связана с этим Pull Request. Первоначальная цель PR: «При использовании S3 в качестве каталога для WAL, записи занимают слишком много времени. Водитель получает очень легко узкое место, когда несколько приемников отправляют события AddBlock в ReceiverTracker. Этот PR добавляет группировку событий в ReceivedBlockTracker, так что приемники не слишком долго блокируются драйвером. »

Мы являемся контрольно-пропускной точкой на S3 в Spark 1.5.2, и проблем с производительностью и надежностью нет. Мы проверили контрольную точку в Spark 1.6.0 на S3 и местном NAS, и в обоих случаях мы получаем это исключение. Похоже, когда на контрольную точку партии требуется более 5 секунд, это исключение возникает, и мы проверили, что события для этой партии теряются навсегда.

Вопросы

  • увеличение «Input Rate» и «Время обработки» летучести Ожидается ли в Спарк Streaming 1.6.0 и есть ли известный способ улучшения его?

  • Вы знаете какой-либо обходной путь, кроме этих 2 ?:

    1) Для того, чтобы гарантировать, что она занимает менее 5 секунд для контрольных точек раковине, чтобы записать все файлы. По моему опыту, вы не можете гарантировать это с помощью S3 даже для небольших партий. Для локального NAS это зависит от того, кто отвечает за инфраструктуру (сложный с облачными провайдерами).

    2) Увеличьте значение свойства spark.streaming.driver.writeAheadLog.batchingTimeout.

  • Ожидаете ли вы потерять какие-либо события в описанном сценарии? Я думаю, что если пакетная контрольная точка не сработает, номера последовательностей осколков/приемников не будут увеличены, и это будет повторено позднее.

Spark 1.5.2 Статистика - Скриншот

enter image description here

Спарк 1.6.0 Статистика - Скриншот

enter image description here

Полный Трассировка стека

16/01/19 03:25:03 WARN ReceivedBlockTracker: Exception thrown while writing record: BlockAdditionEvent(ReceivedBlockInfo(0,Some(3521),Some(SequenceNumberRanges(SequenceNumberRange(StreamEventsPRD,shardId-000000000003,49558087746891612304997255299934807015508295035511636018,49558087746891612304997255303224294170679701088606617650), SequenceNumberRange(StreamEventsPRD,shardId-000000000004,49558087949939897337618579003482122196174788079896232002,49558087949939897337618579006984380295598368799020023874), SequenceNumberRange(StreamEventsPRD,shardId-000000000001,49558087735072217349776025034858012188384702720257294354,49558087735072217349776025038332464993957147037082320914), SequenceNumberRange(StreamEventsPRD,shardId-000000000009,49558088270111696152922722880993488801473174525649617042,49558088270111696152922722884455852348849472550727581842), SequenceNumberRange(StreamEventsPRD,shardId-000000000000,49558087841379869711171505550483827793283335010434154498,49558087841379869711171505554030816148032657077741551618), SequenceNumberRange(StreamEventsPRD,shardId-000000000002,49558087853556076589569225785774419228345486684446523426,49558087853556076589569225789389107428993227916817989666))),BlockManagerBasedStoreResult(input-0-1453142312126,Some(3521)))) to the WriteAheadLog. 
java.util.concurrent.TimeoutException: Futures timed out after [5000 milliseconds] 
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) 
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) 
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) 
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) 
    at scala.concurrent.Await$.result(package.scala:107) 
    at org.apache.spark.streaming.util.BatchedWriteAheadLog.write(BatchedWriteAheadLog.scala:81) 
    at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.writeToLog(ReceivedBlockTracker.scala:232) 
    at org.apache.spark.streaming.scheduler.ReceivedBlockTracker.addBlock(ReceivedBlockTracker.scala:87) 
    at org.apache.spark.streaming.scheduler.ReceiverTracker.org$apache$spark$streaming$scheduler$ReceiverTracker$$addBlock(ReceiverTracker.scala:321) 
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1$$anonfun$run$1.apply$mcV$sp(ReceiverTracker.scala:500) 
    at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1230) 
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$receiveAndReply$1$$anon$1.run(ReceiverTracker.scala:498) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

Исходный код Extract

... 
    // Function to create a new StreamingContext and set it up 
    def setupContext(): StreamingContext = { 
    ... 
    // Create a StreamingContext 
    val ssc = new StreamingContext(sc, Seconds(batchIntervalSeconds)) 

    // Create a Kinesis DStream 
    val data = KinesisUtils.createStream(ssc, 
     kinesisAppName, kinesisStreamName, 
     kinesisEndpointUrl, RegionUtils.getRegionByEndpoint(kinesisEndpointUrl).getName(), 
     InitialPositionInStream.LATEST, Seconds(kinesisCheckpointIntervalSeconds), 
     StorageLevel.MEMORY_AND_DISK_SER_2, awsAccessKeyId, awsSecretKey) 
... 
    ssc.checkpoint(checkpointDir) 

    ssc 
    } 


    // Get or create a streaming context. 
    val ssc = StreamingContext.getActiveOrCreate(checkpointDir, setupContext) 

    ssc.start() 
    ssc.awaitTermination() 
+3

хорошее описание +1 –

+0

межжала. Вы пытались уменьшить размер партии? Как вы защищаетесь от повторной попытки производителя? –

+0

Это помогло бы, если бы у нас не было ограничений на выходные: мы пишем S3, разные обработанные RDD. При увеличении количества рабочих, обработки и вывода уменьшается, а также увеличивается стоимость. Помогает играть с соотношением разделов на одного работника. Тюнинг spark.streaming.blockInterval помогает, поскольку он позволяет вам косвенно контролировать количество разделов, не вызывая перестановки, которые вы получаете с помощью repartition() на уровне DStream. coalesce(), на уровне RDD, также помогает. Что вы подразумеваете под «защитой от повторной попытки производителя»? Обеспечение точно-семантики доставки на выходе для предотвращения дублирования? – MiguelPeralvo

ответ

4

По предложению zero323 «s о публикации мой комментарий как ответ:

Повышение spark.streaming.driver.writeAheadLog.batchingTimeout решается вопрос чекпойнтинг тайм-аут. Мы сделали это, убедившись, что у нас есть место для этого. Некоторое время мы тестировали его. Поэтому я рекомендую увеличить его после тщательного рассмотрения.

ДЕТАЛИ

Мы использовали эти 2 настройки в $ SPARK_HOME/CONF/искровым defaults.conf:

spark.streaming.driver.writeAheadLog.allowBatching правда spark.streaming.driver.writeAheadLog .batchingTimeout 15000

Первоначально у нас только был spark.streaming.driver.writeAheadLog.allowBatching установлен в true.

Перед изменением мы воспроизвели проблему, упомянутую в вопросе («... ReceivedBlockTracker: Исключение, созданное при записи записи ...») в тестовой среде. Это происходило каждые несколько часов. После изменения проблема исчезла. Мы запустили его несколько дней, прежде чем перейти к производству.

Мы обнаружили, что getBatchingTimeout() method of the WriteAheadLogUtils класс имел значение по умолчанию 5000 мс, как показано здесь:

def getBatchingTimeout(conf: SparkConf): Long = { 
    conf.getLong(DRIVER_WAL_BATCHING_TIMEOUT_CONF_KEY, defaultValue = 5000) 
} 

 Смежные вопросы

  • Нет связанных вопросов^_^