2016-12-23 5 views
1

Я использую новый (и помеченный «альфа») Structured Streaming of Spark 2.0.2 для чтения сообщений из темы кафки и обновления пары Cassandra столы из него:Spark структурированный дымящийся от kafka - последнее сообщение обработано снова после возобновления с контрольной точки

val readStream = sparkSession.readStream 
    .format("kafka") 
    .option("subscribe", "maxwell") 
    .option("kafka.bootstrap.servers", "localhost:9092") 
    .load 
    .as[KafkaMessage] 
    .map(<transform KafkaMessage to Company>) 

val writeStream = readStream 
    .writeStream 
    .queryName("CompanyUpdatesInCassandra") 
    .foreach(new ForeachWriter[Company] { 
    def open(partitionId: Long, version: Long): Boolean = { 
     true 
    } 

    def process(company: Company): Unit = { 
     ... 
    } 

    def close(errorOrNull: Throwable): Unit = {} 
    } 
    .start 
    .awaitTermination 

Я также настроил контрольно-пропускной пункт расположение ("spark.sql.streaming.checkpointLocation") на sparkSession. Это позволяет мне получать сообщения, которые были получены, когда потоковое приложение было отключено, как только оно возобновится.

Однако, поскольку я настраивал это местоположение контрольной точки, я заметил, что при возобновлении он также последовательно обрабатывает последнее сообщение предыдущей партии, даже если оно было обработано правильно без сбоев.

Любая идея, что я делаю неправильно здесь? Это кажется очень распространенным вариантом использования.

Подробнее:

Смотрите здесь соответствующие журналы (тема 5876 является последней темой, которая была успешно обрабатывается предыдущей партии):

[INFO] 12:44:02.294 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Resuming streaming query, starting with batch 31 
[DEBUG] 12:44:02.297 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Found possibly uncommitted offsets {KafkaSource[Subscribe[maxwell]]: [(maxwell-0,5877)]} 
[DEBUG] 12:44:02.300 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Resuming with committed offsets: {KafkaSource[Subscribe[maxwell]]: [(maxwell-0,5876)]} 
[DEBUG] 12:44:02.301 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Stream running from {KafkaSource[Subscribe[maxwell]]: [(maxwell-0,5876)]} to {KafkaSource[Subscribe[maxwell]]: [(maxwell-0,5877)]} 
[INFO] 12:44:02.310 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: GetBatch called with start = Some([(maxwell-0,5876)]), end = [(maxwell-0,5877)] 
[INFO] 12:44:02.311 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Partitions added: Map() 
[DEBUG] 12:44:02.313 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: TopicPartitions: maxwell-0 
[DEBUG] 12:44:02.318 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Sorted executors: 
[INFO] 12:44:02.415 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: GetBatch generating RDD of offset range: KafkaSourceRDDOffsetRange(maxwell-0,5876,5877,None) 
[DEBUG] 12:44:02.467 [stream execution thread for CompanyUpdatesInCassandra] org.apache.spark.internal.Logging$class: Retrieving data from KafkaSource[Subscribe[maxwell]]: Some([(maxwell-0,5876)]) -> [(maxwell-0,5877)] 
[DEBUG] 12:44:09.242 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Creating iterator for KafkaSourceRDDOffsetRange(maxwell-0,5876,5877,None) 
[INFO] 12:44:09.879 [Executor task launch worker-0] biz.meetmatch.streaming.CompanyUpdateListener$$anon$1: open (partitionId:0, version:31) 
[DEBUG] 12:44:09.880 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Get spark-kafka-source-369ee4c4-12a1-4b23-b15f-138a7b39b118--1422895500-executor maxwell-0 nextOffset -2 requested 5876 
[INFO] 12:44:09.881 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Initial fetch for maxwell-0 5876 
[DEBUG] 12:44:09.881 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Seeking to spark-kafka-source-369ee4c4-12a1-4b23-b15f-138a7b39b118--1422895500-executor maxwell-0 5876 
[DEBUG] 12:44:10.049 [Executor task launch worker-0] org.apache.spark.internal.Logging$class: Polled spark-kafka-source-369ee4c4-12a1-4b23-b15f-138a7b39b118--1422895500-executor [maxwell-0] 1 

Кроме того, когда я убить поток, я делаю что она останавливается плавно, чтобы избежать потерь данных:

sys.ShutdownHookThread 
{ 
    writeStream.stop 
    sparkSession.stop 
} 

ответ

3

в настоящее время Структурированная Streaming ориентиров состояния, когда новое смещение генерируется. Таким образом, ожидаемый случай, последний завершенный пакет может быть переработан после восстановления. Однако это внутренняя реализация. Предположим, что если мы выполняем контрольную точку при выполнении пакета, все еще возможно, что контрольная точка не удалась, и ваш приемник ForeachWriter также должен обрабатывать этот случай.

Как правило, ваша раковина всегда должна быть идемпотентной.

Обновлено: в Spark 2.2.0, Structured Streaming не запускает пакет после восстановления, если он был успешным.

+0

я вижу. У меня создалось впечатление, что последняя партия будет перерабатываться только после того, как что-то действительно пошло не так между выполнением партии и выполнением контрольной точки. Но на самом деле это не так уж и важно, поскольку ForeachWriter должен быть идемпотентом. Благодаря! –

+1

Теперь это просто упрощение внутренних элементов (мы отмечаем пакет как полный, начиная со следующего). Я думаю, что, скорее всего, мы оптимизируем это в будущем. Однако, как вы сказали, вы все равно должны сделать свой идемпотент Writer, если вам небезразлична семантика. –

 Смежные вопросы

  • Нет связанных вопросов^_^