2015-04-30 4 views
18

Как все знают, что разделители в Spark оказывают огромное влияние на любые «широкие» операции, поэтому он обычно настраивается в операциях. Я экспериментировал со следующим кодом:В Apache Spark, почему RDD.union не сохраняет разделитель?

val rdd1 = 
    sc.parallelize(1 to 50).keyBy(_ % 10) 
    .partitionBy(new HashPartitioner(10)) 
val rdd2 = 
    sc.parallelize(200 to 230).keyBy(_ % 13) 

val cogrouped = rdd1.cogroup(rdd2) 
println("cogrouped: " + cogrouped.partitioner) 

val unioned = rdd1.union(rdd2) 
println("union: " + unioned.partitioner) 

Я вижу, что по умолчанию cogroup() всегда дает RDD с настроенным секционирования, но union() не делает, он всегда будет возвращаться обратно по умолчанию. Это нелогично, поскольку мы обычно предполагаем, что PairRDD должен использовать свой первый элемент в качестве ключа раздела. Есть ли способ «заставить» Spark объединить 2 PairRDD для использования того же ключа раздела?

ответ

33

union - очень эффективная операция, поскольку она не перемещает никаких данных. Если rdd1 имеет 10 разделов и rdd2 имеет 20 разделов, то rdd1.union(rdd2) будет иметь 30 разделов: разделы двух RDD, расположенных друг за другом. Это всего лишь бухгалтерское изменение, нет перетасовки.

Но обязательно он отбрасывает разделитель. Для заданного количества разделов построен разделитель. В результате RDD имеет несколько разделов, которые отличаются от rdd1 и rdd2.

После объединения вы можете запустить repartition, чтобы перетасовать данные и организовать их ключом.


Существует одно исключение из вышеперечисленного. Если rdd1 и rdd2 имеют одинаковый разделитель (с таким же количеством разделов), union ведет себя по-разному. Он присоединяется к разделам двух RDD попарно, указывая на то же количество разделов, что и каждый из входов. Это может включать перемещение данных вокруг (если разделы не были расположены друг с другом), но не будет тасовать. В этом случае разделитель сохраняется. (Код для этого значения находится в PartitionerAwareUnionRDD.scala.)

+4

На самом деле существует союз RDD с поддержкой разделителя, который, как я полагаю, должен использоваться автоматически в случаях, когда разделение может быть сохранено; не знаю, почему он здесь не применяется. См. Https://github.com/apache/spark/blob/e0628f2fae7f99d096f9dd625876a60d11020d9b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala#L123 и https://github.com/apache/spark /blob/master/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala –

+0

Вау, круто! Никогда не знал об этом. Похоже, он используется только тогда, когда оба RDD имеют один и тот же разделитель. Я добавлю это к ответу, спасибо! –

+0

Большое спасибо! Это очень важная оптимизация. BTW, если это не оптимально для всех случаев, я всегда могу написать объединение zip + in-partition в любом случае – tribbloid