1

Я в основном хочу написать обратный вызов события в моей программе драйверов, которая будет перезапускать искровое потоковое приложение по прибытии этого события. Моя программа-драйвер настраивает потоки и логику выполнения, читая конфигурации из файла. Всякий раз, когда файл был изменен (новые конфиги добавлено) программа водитель должен выполнить следующие шаги в последовательности,Каков наилучший способ перезапуска приложения с искровым потоком?

  1. Restart,
  2. Читать конфигурационный файл (как часть основного метода) и
  3. Настроить потоки

Каков наилучший способ достичь этого?

ответ

0

я в настоящее время решить эту проблему следующим образом,

  • Прослушивание внешних событий, подписавшись на тему MQTT

  • В обратном вызове MQTT, остановить контекст потокового ssc.stop(true,true) который будет корректно завершать работу потоки и лежащие в основе spark config

  • Запустите искровое приложение снова, создав свечу conf и установка u р потоки, читая конфигурационный файл

// Contents of startSparkApplication() method 
sparkConf = new SparkConf().setAppName("SparkAppName") 
ssc = new StreamingContext(sparkConf, Seconds(1)) 
val myStream = MQTTUtils.createStream(ssc,...) //provide other options 
myStream.print() 
ssc.start() 

Приложение построено в качестве приложения Spring загрузки

0

Лучший способ перезагрузить Spark на самом деле соответствует вашей среде. Но всегда можно использовать консоль spark-submit.

Вы можете создать процесс spark-submit, как и любой другой процесс linux, положив его на задний план в shell. В вашем случае работа spark-submit фактически запускает драйвер на YARN, так что это ребенок, который выполняет асинхронный процесс на другом компьютере через YARN.

Cloudera blog

0

Одним из способов, которые мы исследовали в последнее время (в искровой Meetup здесь) было добиться этого с помощью Zookeeper в тандеме с искрой. Это в двух словах использует Apache Curator для наблюдения за изменениями в Zookeeper (изменения в конфигурации ZK могут быть вызваны вашим внешним событием), что приводит к перезапуску слушателя.

Основание ссылочного кода here, вы обнаружите, что изменение конфигурации приводит к перезагрузке Watcher (искрового потокового приложения) после изящного отключения и перезагрузки изменений. Надеюсь, это указатель!

1

В некоторых случаях может потребоваться перезагрузка потокового контекста динамически (например, для перезагрузки потоковых операций). В этих случаях вы можете (Scala пример):

val sparkContext = new SparkContext() 

val stopEvent = false 
var streamingContext = Option.empty[StreamingContext] 
val shouldReload = false 

val processThread = new Thread { 
    override def run(): Unit = { 
    while (!stopEvent){ 
     if (streamingContext.isEmpty) { 

     // new context 
     streamingContext = Option(new StreamingContext(sparkContext, Seconds(1))) 

     // create DStreams 
      val lines = streamingContext.socketTextStream(...) 

     // your transformations and actions 
     // and decision to reload streaming context 
     // ... 

     streamingContext.get.start() 
     } else { 
     if (shouldReload) { 
      streamingContext.get.stop(stopSparkContext = false, stopGracefully = true) 
      streamingContext.get.awaitTermination() 
      streamingContext = Option.empty[StreamingContext] 
     } else { 
      Thread.sleep(1000) 
     } 
     } 

    } 
    streamingContext.get.stop(stopSparkContext =true, stopGracefully = true) 
    streamingContext.get.awaitTermination() 
    } 
} 

// and start it in separate thread 
processThread.start() 
processThread.join() 

или питон:

spark_context = SparkContext() 

stop_event = Event() 
spark_streaming_context = None 
should_reload = False 

def process(self): 
    while not stop_event.is_set(): 
     if spark_streaming_context is None: 

      # new context 
      spark_streaming_context = StreamingContext(spark_context, 0.5) 

      # create DStreams 
      lines = spark_streaming_context.socketTextStream(...) 

      # your transformations and actions 
      # and decision to reload streaming context 
      # ... 

      self.spark_streaming_context.start() 
     else: 
      # TODO move to config 
      if should_reload: 
       spark_streaming_context.stop(stopSparkContext=False, stopGraceFully=True) 
       spark_streaming_context.awaitTermination() 
       spark_streaming_context = None 
      else: 
       time.sleep(1) 
    else: 
     self.spark_streaming_context.stop(stopGraceFully=True) 
     self.spark_streaming_context.awaitTermination() 


# and start it in separate thread 
process_thread = threading.Thread(target=process) 
process_thread.start() 
process_thread.join() 

Если вы хотите, чтобы ваш код от аварий и перезапустить контекст потокового с последнего места использование checkpointing механизм. Позволяет восстановить состояние задания после сбоя.