2014-12-03 8 views
3

Я использовал функцию sparkContext.broadcast в своем искрообразующем приложении для совместного использования пула соединений redis (JedisPool).sparkContext broadcast JedisPool не работает

код так:

lazy val redisPool = { 
    val pool = Redis.createRedisPool(redisHost, redisPort) 
    ssc.sparkContext.broadcast(pool) 
} 

Redis.createRedisPool является:

object Redis { 

    def createRedisPool(host: String, port: Int, maxIdle: Int, maxTotal: Int, timeout: Int): JedisPool = { 
    val pc = new JedisPoolConfig 
    pc.setMaxIdle(maxIdle) 
    pc.setMaxTotal(maxTotal) 
    pc.setMaxWaitMillis(timeout) 
    new JedisPool(pc, host, port) 
    } 

    def createRedisPool(host: String, port: Int): JedisPool = { 
    createRedisPool(host, port, 5, 5, 5000) 
    } 
} 

Он работает в режиме локального развертывания, но когда я запускаю это в пряжу/автономном режиме, как

spark-submit --master "yarn-client" --class ... 

получит ошибку:

Exception in thread "main" java.io.NotSerializableException: redis.clients.jedis.JedisPool 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1165) 
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:329) 
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) 
    at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:210) 
    at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:83) 
    at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:68) 
    at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:36) 
    at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) 
    at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) 
    at org.apache.spark.SparkContext.broadcast(SparkContext.scala:809) 
    at org.culiu.bd.streaming.AdSysStreaming$.redisPool$lzycompute$1(AdSysStreaming.scala:84) 
    at org.culiu.bd.streaming.AdSysStreaming$.redisPool$1(AdSysStreaming.scala:82) 
    at org.culiu.bd.streaming.AdSysStreaming$.main(AdSysStreaming.scala:154) 
    at org.culiu.bd.streaming.AdSysStreaming.main(AdSysStreaming.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) 
    at java.lang.reflect.Method.invoke(Method.java:597) 
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 

Я попытался установить spark.serializer = org.apache.spark.serializer.KryoSerializer в моем приложении, а затем получил ошибку, как:

Exception in thread "main" com.esotericsoftware.kryo.KryoException:  java.util.ConcurrentModificationException 
Serialization trace: 
classes (sun.misc.Launcher$AppClassLoader) 
classloader (java.security.ProtectionDomain) 
context (java.security.AccessControlContext) 
acc (org.apache.spark.executor.ExecutorURLClassLoader) 
factoryClassLoader (org.apache.commons.pool2.impl.GenericObjectPool) 
internalPool (redis.clients.jedis.JedisPool) 
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:585) 
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) 
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) 
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) 
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) 
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) 
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:318) 
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:293) 
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) 
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) 
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) 
    at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:549) 
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:570) 
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) 
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) 
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) 
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) 
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) 
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) 
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:213) 
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568) 
    at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:119) 
    at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:210) 
    at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:83) 
    at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:68) 
    at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:36) 
    at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:29) 
    at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) 
    at org.apache.spark.SparkContext.broadcast(SparkContext.scala:809) 
    at org.culiu.bd.streaming.AdSysStreaming$.redisPool$lzycompute$1(AdSysStreaming.scala:85) 
    at org.culiu.bd.streaming.AdSysStreaming$.redisPool$1(AdSysStreaming.scala:83) 
    at org.culiu.bd.streaming.AdSysStreaming$.main(AdSysStreaming.scala:155) 
    at org.culiu.bd.streaming.AdSysStreaming.main(AdSysStreaming.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) 
    at java.lang.reflect.Method.invoke(Method.java:597) 
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.util.ConcurrentModificationException 
    at java.util.AbstractList$Itr.checkForComodification(AbstractList.java:372) 
    at java.util.AbstractList$Itr.next(AbstractList.java:343) 
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:74) 
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:18) 
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:501) 
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.write(FieldSerializer.java:564) 
    ... 39 more 

как я могу решить эту проблему?

+0

есть ссылка на объект контекста внутри тела извлечения def –

+0

yah, ssc определяется как экземпляр StreamingContext, ссылка ssc.sparkContext на SparkContext – Guy

ответ

3

Похоже, проблема в том, что класс redis.clients.jedis.JedisPool не является сериализуемым. Это не похоже на проблему, зависящую от Spark, поскольку я думаю, что любая попытка сериализации этого класса потерпит неудачу.

+0

это любое решение? –