2016-11-23 10 views
5

Мне было интересно, что отличает использование искры mapPartitions Функциональность vs transient lazy val.
Поскольку каждый раздел в основном работает на другом узле, каждый экземпляр переходного ленивого val будет создан на каждом узле (при условии, что он находится в объекте).Spark mapPartitions vs transient lazy val

Например:

class NotSerializable(v: Int) { 
    def foo(a: Int) = ??? 
} 

object OnePerPartition { 
    @transient lazy val obj: NotSerializable = new NotSerializable(10) 
} 

object Test extends App{ 
    val conf = new SparkConf().setMaster("local[2]").setAppName("test") 
    val sc = new SparkContext(conf) 

    val rdd: RDD[Int] = sc.parallelize(1 to 100000) 

    rdd.map(OnePerPartition.obj.foo) 

    // ---------- VS ---------- 

    rdd.mapPartitions(itr => { 
     val obj = new NotSerializable(10) 
     itr.map(obj.foo) 
    }) 
} 

Можно было бы спросить, почему бы вам даже хочет ...
Я хотел бы создать общий контейнер понятие для запуска моей логики в любом обычном осуществлении сбора (RDD, List , scalding pipe и т. Д.)
Все они имеют понятие «карта», но mapPartition уникально для spark.

ответ

2

Прежде всего, вам не нужно transientlazy здесь. Использование object оболочки достаточно, чтобы сделать эту работу, и вы можете написать это как:

object OnePerExecutor { 
    val obj: NotSerializable = new NotSerializable(10) 
} 

Существует фундаментальное различие между объектом оболочки и инициализации NotSerializable внутри mapPartitions. Это:

rdd.mapPartitions(iter => { 
    val ns = NotSerializable(1) 
    ??? 
}) 

создает один NotSerializable экземпляр каждого раздела.

Обертка объектов, с другой стороны, создает один экземпляр NotSerializable для каждого исполнителя JVM. В результате этого экземпляра:

  • Может использоваться для обработки нескольких разделов.
  • Можно получить доступ одновременно несколькими потоками исполнителей.
  • Имеет срок службы, превышающий функциональный вызов, где он используется.

Это означает, что он должен быть потокобезопасным, и любые вызовы методов должны быть свободными от побочных эффектов.