2015-02-17 5 views
6

Я хотел бы передать объект с узла драйвера на другие узлы, где находится RDD, так что каждый раздел RDD может получить доступ к этому объекту, как показано в следующем фрагменте.Как разрешить Spark сериализовать объект с помощью Kryo?

object HelloSpark { 
    def main(args: Array[String]): Unit = { 
     val conf = new SparkConf() 
       .setAppName("Testing HelloSpark") 
       .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
       .set("spark.kryo.registrator", "xt.HelloKryoRegistrator") 

     val sc = new SparkContext(conf) 
     val rdd = sc.parallelize(1 to 20, 4) 
     val bytes = new ImmutableBytesWritable(Bytes.toBytes("This is a test")) 

     rdd.map(x => x.toString + "-" + Bytes.toString(bytes.get) + " !") 
      .collect() 
      .foreach(println) 

     sc.stop 
    } 
} 

// My registrator 
class HelloKryoRegistrator extends KryoRegistrator { 
    override def registerClasses(kryo: Kryo) = { 
     kryo.register(classOf[ImmutableBytesWritable], new HelloSerializer()) 
    } 
} 

//My serializer 
class HelloSerializer extends Serializer[ImmutableBytesWritable] { 
    override def write(kryo: Kryo, output: Output, obj: ImmutableBytesWritable): Unit = { 
     output.writeInt(obj.getLength) 
     output.writeInt(obj.getOffset) 
     output.writeBytes(obj.get(), obj.getOffset, obj.getLength) 
    } 

    override def read(kryo: Kryo, input: Input, t: Class[ImmutableBytesWritable]): ImmutableBytesWritable = { 
     val length = input.readInt() 
     val offset = input.readInt() 
     val bytes = new Array[Byte](length) 
     input.read(bytes, offset, length) 

     new ImmutableBytesWritable(bytes) 
    } 
} 

В приведенном выше фрагменте кода я попытался сериализации ImmutableBytesWritable по Kryo в Спарк, так что я сделал follwing:

  1. Сконфигурируйте SparkConf экземпляр передается искру контекст, то есть, установить "spark.serializer" в "org.apache.spark.serializer.KryoSerializer" и установить "spark.kryo.registrator" в "xt.HelloKryoRegistrator ";
  2. Написать пользовательский класс регистратора Kryo, в котором я зарегистрирую класс ImmutableBytesWritable;
  3. Написать сериалайзер для ImmutableBytesWritable

Однако, когда я представить мое заявление Спарк в режиме пряжи клиента, следующее было брошено исключение:

Исключение в потоке «основного» орг. apache.spark.SparkException: Задача не сериализуема на org.apache.spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner.scala: 166) at org.apache.spark.util.ClosureCleaner $ .clean (ClosureCleaner.scala: 158) по адресу org.apache.sp ark.SparkContext.clean (SparkContext.scala: 1242) at org.apache.spark.rdd.RDD.map (RDD.scala: 270) at xt.HelloSpark $ .main (HelloSpark.scala: 23) at xt .HelloSpark.main (HelloSpark.scala) в sun.reflect.NativeMethodAccessorImpl.invoke0 (метод Native) на sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:57) в sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl. java: 43) at java.lang.reflect.Method.invoke (Method.java:606) at org.apache.spark.deploy.SparkSubmit $ .launch (SparkSubmit.scala: 325) at org.apache.spark .deploy.SparkSubmit $ .main (SparkSubmit.scala: 75) at org.apache.spark.deploy.SparkSubmit.main (SparkSubmit.scala) Вызванный: java.io.NotSerializableException: org.apache.hadoop.hbase.io.ImmutableBytesWritable на java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1183) в java.io.ObjectOutputStream.defaultWriteFields (ObjectOutputStream.java : 1547) на java.io.ObjectOutputStream.writeSerialData (ObjectOutputStream.java:1508) на java.io.ObjectOutputStream.writeOrdinaryObject (ObjectOutputStream.java:1431) в java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1177) на java.io.ObjectOutputStream.writeObject (ObjectOutputStream.java:347) на org.apache.spark.serializer.JavaSerializationStream.writeObject (JavaSerializer.scala: 42) в org.apache.spark.serializer.JavaSerializerInstance.serialize (JavaSer ializer.scala: 73) at org.apache.spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner.scala: 164) ...12 более

кажется, что ImmutableBytesWritable не может быть сериализуется Kryo. Итак, каков правильный способ позволить Spark сериализовать объект с помощью Kryo? Может ли Kryo сериализовать любой тип?

+0

То же самое происходит со мной, даже с гораздо более простой конфигурации (только настройки сериализатора конфигурации и регистрации классов). Обратите внимание на эту строку вашего стека: 'org.apache.spark.serializer.JavaSerializerInstance.serialize (JavaSerializer.scala: 73)', по какой-то причине он пытается использовать сериализацию Java, даже если вы сказали ему не делать этого. –

+0

Удалось ли вам это решить? У меня такая же проблема. – Nilesh

ответ

0

Это происходит потому, что вы используете ImmutableBytesWritable в своем закрытии. Spark не поддерживает сериализацию замыкания с Kryo еще (только объекты в RDD). Вы можете воспользоваться помощью этого, чтобы решить вашу проблему:

Spark - Task not serializable: How to work with complex map closures that call outside classes/objects?

Вам просто нужно сериализовать объекты перед прохождением через крышку, и де-сериализации впоследствии. Этот подход просто работает, даже если ваши классы не являются Serializable, потому что он использует Kryo за кулисами. Все, что вам нужно, это карри. ;)

Вот пример эскиза:

def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)]) 
       (foo: Foo) : Bar = { 
    kryoWrapper.value.apply(foo) 
} 
val mapper = genMapper(KryoSerializationWrapper(new ImmutableBytesWritable(Bytes.toBytes("This is a test")))) _ 
rdd.flatMap(mapper).collectAsMap() 

object ImmutableBytesWritable(bytes: Bytes) extends (Foo => Bar) { 
    def apply(foo: Foo) : Bar = { //This is the real function } 
}