0

Я бег искры потоковой с Пряжей с -Спарка залитой пряжи: исполнители не в полной мере

spark-submit --master yarn --deploy-mode cluster --num-executors 2 --executor-memory 8g --driver-memory 2g --executor-cores 8 .. 

Я потребляющий Кафку через DireactStream подход (Нет приемника). У меня есть 2 темы (каждая из которых состоит из 3 разделов).

Репарация RDD (у меня есть один DStream) на 16 частей (при условии, что нет исполнителей * несколько ядер = 2 * 8 = 16 Правильно ли это?), А затем я делаю foreachPartition и записываю каждый раздел в локальный файл, а затем отправьте его на другой сервер (не искру) через http (с помощью клиента apache http sync с менеджером пула через почту с несколькими частями).

Когда я проверил детали этого шага (или JOB - это правильное обозначение?) Через интерфейс Spark, он показал, что всего 16 задач выполняются на одном исполнителе с 8 заданиями за раз.

Это Спарк детали пользовательского интерфейса -

Детали для этапа 717 (Попытка 0)

Index ID Attempt Status Locality Level Executor ID/Host Launch Time Duration GC Time Shuffle Read Size/Records Errors 
0 5080 0 SUCCESS NODE_LOCAL 1/executor1_machine_host_name 2016/12/27 12:11:46 2 s 11 ms 313.3 KB/6137 
1 5081 0 SUCCESS NODE_LOCAL 1/executor1_machine_host_name 2016/12/27 12:11:46 2 s 11 ms 328.5 KB/6452 
2 5082 0 SUCCESS NODE_LOCAL 1/executor1_machine_host_name 2016/12/27 12:11:46 2 s 11 ms 324.3 KB/6364 
3 5083 0 SUCCESS NODE_LOCAL 1/executor1_machine_host_name 2016/12/27 12:11:46 2 s 11 ms 321.5 KB/6306 
4 5084 0 SUCCESS NODE_LOCAL 1/executor1_machine_host_name 2016/12/27 12:11:46 2 s 11 ms 324.8 KB/6364 
5 5085 0 SUCCESS NODE_LOCAL 1/executor1_machine_host_name 2016/12/27 12:11:46 2 s 11 ms 320.8 KB/6307 
6 5086 0 SUCCESS NODE_LOCAL 1/executor1_machine_host_name 2016/12/27 12:11:46 2 s 11 ms 323.4 KB/6356 
7 5087 0 SUCCESS NODE_LOCAL 1/executor1_machine_host_name 2016/12/27 12:11:46 3 s 11 ms 316.8 KB/6207 
8 5088 0 SUCCESS NODE_LOCAL 1/executor1_machine_host_name 2016/12/27 12:11:48 2 s 317.7 KB/6245 
9 5089 0 SUCCESS NODE_LOCAL 1/executor1_machine_host_name 2016/12/27 12:11:48 2 s 320.4 KB/6280 
10 5090 0 SUCCESS NODE_LOCAL 1/executor1_machine_host_name 2016/12/27 12:11:48 2 s 323.0 KB/6334 
11 5091 0 SUCCESS NODE_LOCAL 1/executor1_machine_host_name 2016/12/27 12:11:48 2 s 323.7 KB/6371 
12 5092 0 SUCCESS NODE_LOCAL 1/executor1_machine_host_name 2016/12/27 12:11:48 2 s 316.7 KB/6218 
13 5093 0 SUCCESS NODE_LOCAL 1/executor1_machine_host_name 2016/12/27 12:11:48 2 s 321.0 KB/6301 
14 5094 0 SUCCESS NODE_LOCAL 1/executor1_machine_host_name 2016/12/27 12:11:48 2 s 321.4 KB/6304 
15 5095 0 SUCCESS NODE_LOCAL 1/executor1_machine_host_name 2016/12/27 12:11:49 2 s 319.1 KB/6267 

Я ожидал, чтобы выполнить 16 параллельных задач (2 ИСПОЛНИТЕЛЬ * 8 ядро) на любом один или несколько исполнителей. Я думаю, что я чего-то не хватает. Пожалуйста помоги.

Update:

  1. Входящие данные распределены неравномерно. например 1-я тема имеет 2-й раздел с 5 * 5 = 25k сообщений (5k = maxRatePerPartition, 5s = пакетный интервал), а два других раздела имеют почти 0 данных несколько раз. Вторая тема имеет ~ 500-4000 сообщений за партию, которая равномерно распределена по 3 разделам.

  2. когда нет данных в теме 1, тогда я вижу 16 параллельных задач обработки через 2 исполнителя.


Index ID Attempt Status Locality Level Executor ID/Host Launch Time Duration GC Time Shuffle Read Size/Records Errors 
0 330402 0 SUCCESS NODE_LOCAL 1/executor1_machine_host_name 2016/12/28 04:31:41 1 s 19.2 KB/193 
1 330403 0 SUCCESS NODE_LOCAL 2/executor2_machine_host_name 2016/12/28 04:31:41 1 s 21.2 KB/227 
2 330404 0 SUCCESS NODE_LOCAL 1/executor1_machine_host_name 2016/12/28 04:31:41 1 s 20.8 KB/214 
3 330405 0 SUCCESS NODE_LOCAL 2/executor2_machine_host_name 2016/12/28 04:31:41 1 s 20.9 KB/222 
4 330406 0 SUCCESS NODE_LOCAL 1/executor1_machine_host_name 2016/12/28 04:31:41 2 s 21.0 KB/222 
5 330407 0 SUCCESS NODE_LOCAL 2/executor2_machine_host_name 2016/12/28 04:31:41 1 s 20.5 KB/213 
6 330408 0 SUCCESS NODE_LOCAL 1/executor1_machine_host_name 2016/12/28 04:31:41 1 s 20.4 KB/207 
7 330409 0 SUCCESS NODE_LOCAL 2/executor2_machine_host_name 2016/12/28 04:31:41 1 s 19.2 KB/188 
8 330410 0 SUCCESS NODE_LOCAL 1/executor1_machine_host_name 2016/12/28 04:31:41 1 s 20.4 KB/214 
9 330411 0 SUCCESS NODE_LOCAL 2/executor2_machine_host_name 2016/12/28 04:31:41 1 s 20.1 KB/206 
10 330412 0 SUCCESS NODE_LOCAL 1/executor1_machine_host_name 2016/12/28 04:31:41 0.6 s 18.7 KB/183 
11 330413 0 SUCCESS NODE_LOCAL 2/executor2_machine_host_name 2016/12/28 04:31:41 1 s 20.6 KB/217 
12 330414 0 SUCCESS NODE_LOCAL 1/executor1_machine_host_name 2016/12/28 04:31:41 1 s 20.0 KB/206 
13 330415 0 SUCCESS NODE_LOCAL 2/executor2_machine_host_name 2016/12/28 04:31:41 1 s 20.7 KB/216 
14 330416 0 SUCCESS NODE_LOCAL 1/executor1_machine_host_name 2016/12/28 04:31:41 1 s 18.8 KB/186 
15 330417 0 SUCCESS NODE_LOCAL 2/executor2_machine_host_name 2016/12/28 04:31:41 1 s 20.4 KB/213 
+0

Pls проверит это [ответ один раз] (http://stackoverflow.com/questions/38465692/spark-coalesce-relationship-with-number-of-executors-and-cores/40410040#40410040). – mrsrinivas

+0

Попробуйте дать '--num-executors 6' (Поскольку у вас есть 2 темы с 3 разделами). ** 1 partitions = 1 исполнитель ** - идеальный выбор.('--executor-cores' будет зависеть от вашей доступности ядра и требуемой распараллеливания в каждом разделе) – mrsrinivas

+0

Я пробовал с 6 исполнителем и 4-мя ядром, но все же вся эта задача выполняется на том же самом исполнителе (теперь 4 за раз) –

ответ

0

Попробуйте увеличить число разделов равно числу ИСПОЛНИТЕЛЬ ядер, так как вы даете 8 ядер исполнителя, увеличить количество разделов на Кафки тему до 8. Кроме того, проверьте, что произойдет, если вы не делаете повторно раздел.

+1

Я сомневаюсь, что это решит мою проблему, так как чтение не является узким местом, это планирование –

0

Набор параметров ниже с --num-исполнителей 6

spark.default.parallelism

spark.streaming.concurrentJobs

Набор выше значений параметров в соответствии с вашими требованиями и окружающей среды. Это будет работать для вас.