2017-02-02 16 views
2

Я написал искровой работу, которая делает ниже операцийСпарк foreachpartition улучшения связи

  1. Читает данные из HDFS текстовых файлов.
  2. Сделайте отдельный() вызов для фильтрации дубликатов.
  3. ли фазы mapToPair и генерировать pairRDD
  4. Выполните reducebykey вызов
  5. сделать агрегация логики для сгруппированных кортежа.
  6. теперь называют Еогеасп на # 5

    здесь это

    1. сделать вызов Кассандры дб
    2. создать AWS SNS и SQS соединение клиента
    3. сделать некоторые JSON записи форматирования.
    4. опубликовать запись в SNS/SQS

когда я запускаю эту работу, он создает три искры этапы

первый этап - это занимает около 45 сек. выполняет отчетливую второго этапа - mapToPair и reducebykey = занимает 1,5 мин

третьей ступени = занимает 19 минут

что я сделал

  1. Я выключил Кассандру вызов так увидеть DB ударила причину - это принимая меньше времени
  2. Оскорблять часть я нашел, чтобы создать SNS/SQS соединения Foreach секционирования

его принятие более чем 60% всего рабочего времени

Я создаю соединение SNS/SQS внутри foreachPartition, чтобы улучшить меньше соединений. у нас есть еще лучший способ

Я не могу создать объект подключения на водителя, как это не сериализации

Я не использую числа исполнителя 9, executore сердечник 15, 2g памяти драйвера, Вершитель память 5G

Я использую 16 сердечник 64 гиг память размера кластера 1 мастер 9 раба все же конфигурацию ЭХ искр развертывания 1,6

+0

Вы уверены, что 'создать AWS SNS и SQS клиента соединение ' берет 60% времени работы или 'опубликовать запись в SNS/SQS' это? Между этими двумя есть небольшая разница. В первом случае вам необходимо свести к минимуму количество создания соединения, тогда как во втором случае вам необходимо распределить свои данные (и создать больше экземпляра подключения). Интересно!!!! – code

+0

Если это второй случай, я отправлю ответ с помощью решения. – code

ответ

1

Это звучит, как вы хотели бы установить в точности одно соединения SNS/SQS для каждого узла, а затем использовать его для обрабатывать все ваши данные на каждом узле.

Я думаю, что foreachPartition - это правильная идея здесь, но вы можете захотеть объединить ваш RDD заранее. Это приведет к срыву разделов на одном узле без перетасовки и позволит вам не запускать дополнительные соединения SNS/SQS.

Смотрите здесь: https://spark.apache.org/docs/latest/api/scala/index.html#[email protected](numPartitions:Int,shuffle:Boolean,partitionCoalescer:Option[org.apache.spark.rdd.PartitionCoalescer])(implicitord:Ordering[T]):org.apache.spark.rdd.RDD[T]

+0

Да, coalesce - это именно мое решение. Еще один момент, который я хочу добавить здесь. У меня было много небольших файлов, таких как 23kb, 45 kb и т. Д., И с coalesce назовите его сжатым до правого раздела, и теперь я могу обрабатывать около 25 гб за 20 минут. Улучшение здесь больше – Sam

+0

Спасибо Брэдли .. еще одна вещь .. это было сказано, мне нужны данные 1 ТБ для обработки того, сколько разделов с объединением я должен создать? – Sam

+0

Поэтому я бы использовал несколько разделов, достаточно больших, чтобы каждый из них вписывался в память или количество ядер, которые у меня есть. Какой бы ни был больше. –