2016-07-05 3 views
1

У меня есть информация о конфигурации, записанная в Zookeeper. Я использую Apache Curator для чтения конфигурации (если есть лучшее решение для ее чтения, я рад ее использовать) с Watcher Watcher, поэтому, если конфигурация будет изменена в Zookeeper, я получу новую. Я использую эту конфигурацию в Spark. Как я могу поделиться им со всеми исполнителями искры одного и того же приложения?Совместное использование конфигурации Zookeeper для нескольких исполнителей искры

Спасибо!

LE:

Спасибо Dikei,

В следующем коде, где бы вы сделали реализацию наблюдатела? Я новичок в искры, и я не совсем уверен, что касается каждого рабочего.

Спасибо!

final JavaDStream<ElementMessage> nodeMessageStream = mapWithStateDistinctAndFiltered.flatMap(pair -> pair._2.buildElementMessages()) 
      .filter(f -> f != null); 

    nodeMessageStream.foreachRDD(rdd -> { 
     rdd.foreachPartition(r -> { 
      final ElementRecordRestClient rest = new ElementRecordRestClient(
        startProps.getProperty(InputPropertyKey.WEPAPP_URL.toString())); 
      r.forEachRemaining(message -> { 
       rest.createObject(message.toElementRecord()); 
      }); 
     }); 
    }); 
+0

Благодарим вас за ответ. Я отредактировал начальную запись. – Vlad

ответ

0

Что я буду делать в этом случае, чтобы запустить Куратор Watcher на главном узле, и транслировать конфигурацию всех исполнителю, используя переменную широковещательный искру в. Всякий раз, когда конфигурация изменилась, вы останавливаете текущий контекст потоковой передачи и запускаете новую с новой конфигурацией. Это гарантирует, что ваш результат всегда согласован.

Другой способ - считывать конфигурацию zookeeper внутри функции лямбда foreachPartition. Но поскольку конфигурация читается независимо каждым разделом, разные разделы одного и того же RDD могут иметь разные конфигурации, что может быть не так, как вы ожидали.

+0

Интересное решение, первое, но остановка и начало потока беспокоит меня. Что это значит точно, как я могу остановить и запустить поток (кроме ручного)? Спасибо! – Vlad

+0

Вы должны сделать это вручную, вызывая 'stop' в текущем объекте контекста, чтобы остановить обработку. Затем создайте новый потоковый контекст и запустите его, вызвав 'start'. Если ваш исходный источник работает как постоянная очередь, то при запуске нового потокового контекста он возобновится, когда предыдущий контекст остановился. – Dikei

+0

Благодарим за помощь. – Vlad

 Смежные вопросы

  • Нет связанных вопросов^_^