Я хотел бы передать объект с узла драйвера на другие узлы, где находится RDD, так что каждый раздел RDD может получить доступ к этому объекту, как показано в следующем фрагменте.Как разрешить Spark сериализовать объект с помощью Kryo?
object HelloSpark {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("Testing HelloSpark")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "xt.HelloKryoRegistrator")
val sc = new SparkContext(conf)
val rdd = sc.parallelize(1 to 20, 4)
val bytes = new ImmutableBytesWritable(Bytes.toBytes("This is a test"))
rdd.map(x => x.toString + "-" + Bytes.toString(bytes.get) + " !")
.collect()
.foreach(println)
sc.stop
}
}
// My registrator
class HelloKryoRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) = {
kryo.register(classOf[ImmutableBytesWritable], new HelloSerializer())
}
}
//My serializer
class HelloSerializer extends Serializer[ImmutableBytesWritable] {
override def write(kryo: Kryo, output: Output, obj: ImmutableBytesWritable): Unit = {
output.writeInt(obj.getLength)
output.writeInt(obj.getOffset)
output.writeBytes(obj.get(), obj.getOffset, obj.getLength)
}
override def read(kryo: Kryo, input: Input, t: Class[ImmutableBytesWritable]): ImmutableBytesWritable = {
val length = input.readInt()
val offset = input.readInt()
val bytes = new Array[Byte](length)
input.read(bytes, offset, length)
new ImmutableBytesWritable(bytes)
}
}
В приведенном выше фрагменте кода я попытался сериализации ImmutableBytesWritable по Kryo в Спарк, так что я сделал follwing:
- Сконфигурируйте SparkConf экземпляр передается искру контекст, то есть, установить "spark.serializer" в "org.apache.spark.serializer.KryoSerializer" и установить "spark.kryo.registrator" в "xt.HelloKryoRegistrator ";
- Написать пользовательский класс регистратора Kryo, в котором я зарегистрирую класс ImmutableBytesWritable;
- Написать сериалайзер для ImmutableBytesWritable
Однако, когда я представить мое заявление Спарк в режиме пряжи клиента, следующее было брошено исключение:
Исключение в потоке «основного» орг. apache.spark.SparkException: Задача не сериализуема на org.apache.spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner.scala: 166) at org.apache.spark.util.ClosureCleaner $ .clean (ClosureCleaner.scala: 158) по адресу org.apache.sp ark.SparkContext.clean (SparkContext.scala: 1242) at org.apache.spark.rdd.RDD.map (RDD.scala: 270) at xt.HelloSpark $ .main (HelloSpark.scala: 23) at xt .HelloSpark.main (HelloSpark.scala) в sun.reflect.NativeMethodAccessorImpl.invoke0 (метод Native) на sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:57) в sun.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl. java: 43) at java.lang.reflect.Method.invoke (Method.java:606) at org.apache.spark.deploy.SparkSubmit $ .launch (SparkSubmit.scala: 325) at org.apache.spark .deploy.SparkSubmit $ .main (SparkSubmit.scala: 75) at org.apache.spark.deploy.SparkSubmit.main (SparkSubmit.scala) Вызванный: java.io.NotSerializableException: org.apache.hadoop.hbase.io.ImmutableBytesWritable на java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1183) в java.io.ObjectOutputStream.defaultWriteFields (ObjectOutputStream.java : 1547) на java.io.ObjectOutputStream.writeSerialData (ObjectOutputStream.java:1508) на java.io.ObjectOutputStream.writeOrdinaryObject (ObjectOutputStream.java:1431) в java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1177) на java.io.ObjectOutputStream.writeObject (ObjectOutputStream.java:347) на org.apache.spark.serializer.JavaSerializationStream.writeObject (JavaSerializer.scala: 42) в org.apache.spark.serializer.JavaSerializerInstance.serialize (JavaSer ializer.scala: 73) at org.apache.spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner.scala: 164) ...12 более
кажется, что ImmutableBytesWritable не может быть сериализуется Kryo. Итак, каков правильный способ позволить Spark сериализовать объект с помощью Kryo? Может ли Kryo сериализовать любой тип?
То же самое происходит со мной, даже с гораздо более простой конфигурации (только настройки сериализатора конфигурации и регистрации классов). Обратите внимание на эту строку вашего стека: 'org.apache.spark.serializer.JavaSerializerInstance.serialize (JavaSerializer.scala: 73)', по какой-то причине он пытается использовать сериализацию Java, даже если вы сказали ему не делать этого. –
Удалось ли вам это решить? У меня такая же проблема. – Nilesh