2017-02-21 22 views
1

У меня есть следующий искровой простой пример:Искры, некоторые операции выполняются до того, как определено действие?

#1 val lines: RDD[String] = sc.textFile("/data/non_existing_file.txt") 
#2 val words: RDD[String] = lines.flatMap(line => line.split(" ")) 
#3 val pairs: RDD[(String, Int)] = words.map(word => (word, 1)) 
#4 val counts: RDD[(String, Int)] = pairs.reduceByKey(_ + _) 
#5 counts.saveAsTextFile("/tmp/result") 

Когда я запускаю программу, я получаю исключение Input path does not exist: file:/data/non_existing_file.txt", как и ожидалось.

Что является обратным, так это то, что я получаю это исключение в строке # 4. Я понимаю, что я не получаю эту ошибку в строке # 1, # 2 или # 3, потому что вычисление еще не выполнено. Вычисление выполняется только в строке # 5, когда у меня есть действие для записи результата в файл. Итак, почему я получаю исключение в строке # 4 вместо строки # 5?

ответ

1

Это происходит при двух условиях:

  • spark.default.parallelism не установлен.
  • Вы обеспечиваете ни Partitioner, ни количество разделов для reduceByKey

В этом случае reduceByKey охотно создает HashPartitioner с числом разделов, равным числу разбиений родителя RDD. Чтобы получить номер раздела, он должен вычислить входные расщепления. Это требует наличия файла в пути ввода-файла, который, кажется, отсутствует, следовательно, ошибка.

Фактический reduceByKey Операция будет выполняться только после вызова вызова.

Это очень похожая проблема для Why does sortBy transformation trigger a Spark job?