2017-02-02 4 views
0

Я запускаю 3 процесса Spark Streaming одновременно с EMR-кластером Amazon. Проблема состоит в том, что одна из этих Потоковые заданий три Спарк делает обработку на основе toLocalIterator:Выполнение трех параллельных заданий Spark Streaming

dstreamdata.foreachRDD(entry => { 
     entry.toLocalIterator.foreach 

Я заметил, что она застревает (похоже, что не хватает ресурсов или около того), но он не возвращает ошибку, просто не обрабатывает данные.

Я использую параметры folloiwng из spark-submit для каждого задания:

spark-submit --deploy-mode cluster --executor-cores 6 --executor-memory 10g --num-executors 2 --conf spark.yarn.submit.waitAppCompletion=false --queue queue_name_of_spark_job 

Любая идея, как решить эту проблему без изменения кода?

ответ

1

1.1) Если вы используете Kinesis в качестве очереди, убедитесь, что у вас в два раза больше ядер-исполнителей, чем у Kinesis shards. Это может относиться к Kafka, я забываю, как работает соединитель kafka. Это связано с тем, что разъем потребляет одно ядро ​​на каждый осколок, поэтому вы должны убедиться, что у вас есть доступные ядра-исполнители для фактической обработки данных.

В прошлом я использовал одного исполнителя в каждом кинезианском осколке, каждый из которых имел 2 или более ядра, которые хорошо работали в моих прецедентах.

1.2) В настоящий момент ваш код извлекает все данные обратно в качестве итератора. Если у вас много данных, вам может потребоваться выделить больше ресурсов для драйвера, чтобы он мог обрабатывать все данные в RDD. Этот вид кажется немного неправильным: - если вы можете поместить все данные в один экземпляр, вам не нужна сложность Spark!

Spark 2.0.x Configuration предоставляет информацию о конфигурации, доступной для вас.

Я рекомендую посмотреть на driver.cores и/или driver.memory для начала. Я подозреваю, что вам нужно больше ядер, но вам нужно будет экспериментировать.

2) Я ценю, что вы не хотите менять код, но ... Если возможно, вы можете использовать entry.foreachPartition().

Этот подход позволяет избежать проблем с производительностью обработки всех данных в процессе драйвера. Это или некоторые варианты логики должны помочь вам решить вашу проблему, в зависимости от вашего конкретного варианта использования.

Вот некоторые примеры кода со ссылкой на дополнительную информацию:

dstream.foreachRDD { rdd => 
    // code here is executed by the driver 
    rdd.foreachPartition { partitionOfRecords => 
    // code here is executed by the workers per partition 
    } 
} 

http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd

+0

Я использую Кафка и каждый из трех Спарков Потоковых заданий читает из различной очереди Кафки. Может быть, это причина? – Dinosaurius

+0

Возможно, но я не знаю, как разъем kafka потребляет ресурсы. Но вам нужно будет исследовать журналы. Это могут быть исполнители, голодные из ресурсов или драйвера. Попытайтесь выяснить, какой из них может быть. – ImDarrenG