2016-11-29 7 views
3

У меня есть ситуация, когда я хочу выполнить системный процесс для каждого рабочего в Spark. Я хочу, чтобы этот процесс запускался на каждом компьютере один раз. В частности, этот процесс запускает демон, который должен выполняться до завершения остальной части моей программы. В идеале это должно выполняться до того, как я прочитал данные.Возможно ли выполнить команду для всех работников в Apache Spark?

Я нахожусь на Spark 2.0.2 и использую динамическое распределение.

+0

Дубликат: http://stackoverflow.com/questions/37343437/how-to-run-a-function-on-all-spark-workers-before-processing-data-in-pyspark –

ответ

5

Возможно, вы сможете достичь этого, используя комбинацию ленивых входов и трансляции Spark. Это будет что-то вроде ниже. (Не компилируется ниже кода, возможно, придется изменить несколько вещей)

object ProcessManager { 
    lazy val start = // start your process here. 
} 

Вы можете транслировать этот объект в начале вашего приложения, прежде чем делать какие-либо преобразования.

val pm = sc.broadcast(ProcessManager) 

Теперь вы можете получить доступ к этому объекту внутри преобразования, как вы делаете с любым другими переменными широковещательным и вызвать ленивый Вал.

rdd.mapPartition(itr => { 
    pm.value.start 
    // Other stuff here. 
} 
+0

Не выполнит ли этот процесс один раз на раздел, а не один раз на одного работника? – Jon

+0

Вы правы, это всего лишь пример. Но поскольку это ленивый val и ProcessManager - это «объект», он запускается только один раз в исполнителе. – Jegan

+0

Вещание этого объекта немного странно. Вы должны передавать данные, а не код. Просто наличие объекта и доступ к стартовой переменной будет достаточно. Таким образом, вам не нужен объект ProcessManager для сериализации. – Atreys

2

object со статической инициализацией, которая вызывает ваш системный процесс, должна сделать трюк.

object SparkStandIn extends App { 
    object invokeSystemProcess { 
    import sys.process._ 
    val errorCode = "echo Whatever you put in this object should be executed once per jvm".! 

    def doIt(): Unit = { 
     // this object will construct once per jvm, but objects are lazy in 
     // another way to make sure instantiation happens is to check that the errorCode does not represent an error 
    } 
    } 
    invokeSystemProcess.doIt() 
    invokeSystemProcess.doIt() // even if doIt is invoked multiple times, the static initialization happens once 
} 
+0

Но как вы гарантируете, что он фактически инициализирован без повторения вызовов на каждое преобразование? –

 Смежные вопросы

  • Нет связанных вопросов^_^