2017-02-01 5 views
0

Я использую Spark 1.6.0 с Cloudera 5.8.3.
У меня есть DStream объект и множество преобразований, определенных на нем,В Spark Streaming, есть ли способ обнаружить, когда партия закончилась?

val stream = KafkaUtils.createDirectStream[...](...) 
val mappedStream = stream.transform { ... }.map { ... } 
mappedStream.foreachRDD { ... } 
mappedStream.foreachRDD { ... } 
mappedStream.map { ... }.foreachRDD { ... } 

Есть ли способ, чтобы зарегистрировать последний foreachRDD, который гарантированно будет выполнена последней, и только если вышеуказанные foreachRDD s закончили выполнение?
Другими словами, когда пользовательский интерфейс Spark показывает, что задание было завершено - вот когда я хочу выполнить легкую функцию.

Есть ли что-то в API, которое позволяет мне это достичь?

Благодаря

+0

Просто интересно, можете ли вы перестроить код таким образом, чтобы вся логика, выполняемая над 'mappedStream', выполнялась в одном одиночном foreachRDD, включая карту на последней строке? Просто 'foreachRDD' не является преобразованием. А карта на DStream эквивалентна созданию карты на каждом rdd в foreachRDD? Если это имеет смысл? – ImDarrenG

+0

В конце дня я мог бы, возможно, сузить его до одного 'foreachRDD', а затем« last 'foreachRDD» было бы довольно просто реализовать. Тем не менее, у меня есть несколько источников с разными конвейерами данных, определенными для них. Реализация имеет основной класс, который считывает конфигурацию и создает источники соответственно (источники Kafka действительно), и каждый источник проходит через другой конвейер данных - то есть класс, который определяет, как должен обрабатываться поток. Я хочу обнаружить готовые партии в основном классе, независимо от преобразований, определенных в классах конвейера. –

ответ

4

Использование потокового слушателей должны решить эту проблему для вас:

(жаль, что ява пример)

ssc.addStreamingListener(new JobListener()); 

// ... 

class JobListener implements StreamingListener { 

@Override 
    public void onBatchCompleted(StreamingListenerBatchCompleted batchCompleted) { 

     System.out.println("Batch completed, Total delay :" + batchCompleted.batchInfo().totalDelay().get().toString() + " ms"); 

    } 

    /* 

    snipped other methods 

    */ 


} 

https://gist.github.com/akhld/b10dc491aad1a2007183

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-streaming/spark-streaming-streaminglisteners.html

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.streaming.scheduler.StreamingListener

+0

Точно, что я искал, спасибо! –

+0

Не беспокойтесь. Было бы интересно услышать, как вы справляетесь, для дальнейшего использования. Только если вы получаете запасную минуту: o) – ImDarrenG