2016-06-24 7 views
1

У меня возникают проблемы с доступом к переменной изнутри функции преобразования. Может ли кто-нибудь помочь мне? Вот мои соответствующие классы и функции.Невозможно получить доступ к широковещательной переменной при трансформации

@SerialVersionUID(889949215L) 
object MyCache extends Serializable { 
    @transient lazy val logger = Logger(getClass.getName) 
    @volatile var cache: Broadcast[Map[UUID, Definition]] = null 

    def getInstance(sparkContext: SparkContext) : Broadcast[Map[UUID, Definition]] = { 
     if (cache == null) { 
      synchronized { 
       val map = sparkContext.cassandraTable("keyspace", "table") 
        .collect() 
        .map(m => m.getUUID("id") -> 
         Definition(m.getString("c1"), m.getString("c2"), m.getString("c3"), 
           m.getString("c4"))).toMap 
       cache = sparkContext.broadcast(map) 
      } 
     } 
     cache 
    } 
} 

В другом файле:

object Processor extends Serializable { 
    @transient lazy val logger = Logger(getClass.getName) 

    def processData[T: ClassTag](rawStream: DStream[(String, String)], ssc: StreamingContext, 
             processor: (String, Broadcast[Map[UUID, Definition]]) => T): DStream[T] = { 
     MYCache.getInstance(ssc.sparkContext) 
     var newCacheValues = Map[UUID, Definition]() 
     rawStream.cache() 
     rawStream 
      .transform(rdd => { 
       val array = rdd.collect() 
       array.foreach(r => { 
         val value = getNewCacheValue(r._2, rdd.context) 
         if (value.isDefined) { 
          newCacheValues = newCacheValues + value.get 
         } 
       }) 
       rdd 
      }) 
     if (newCacheValues.nonEmpty) { 
      logger.info(s"Rebroadcasting. There are ${newCacheValues.size} new values") 
      logger.info("Destroying old cache") 
      MyCache.cache.destroy() 
      // this is probably wrong here, destroying object, but then referencing it. But I haven't gotten to this part yet. 
      MyCache.cache = ssc.sparkContext.broadcast(MyCache.cache.value ++ newCacheValues) 
     } 
     rawStream 
      .map(r => { 
       println("######################") 
       println(MyCache.cache.value) 
       r 
      }) 
      .map(r => processor(r._2, MyCache.cache.value)) 
      .filter(r => null != r) 
    } 
} 

Каждый раз, когда я запускаю это я получаю SparkException: Failed to get broadcast_1_piece0 of broadcast_1 при попытке получить доступ к cache.value

Когда я добавить println(MyCache.cache.values) сразу после .getInstance я способный получить доступ к широковещательной переменной, но когда я развертываю его в кластере mesos, я не могу снова получить доступ к значениям широковещания, но с исключением с нулевым указателем.

Update:

ошибка, что я вижу на println(MyCache.cache.value). Я не должен был добавлять эту инструкцию if, содержащую уничтожение, потому что мои тесты никогда не ударяют об этом.

Основы моего приложения: у меня есть таблица в cassandra, которая не будет обновляться очень сильно. Но мне нужно сделать некоторые проверки на некоторые потоковые данные. Поэтому я хочу вытащить все данные из этой таблицы, которые не обновляются много, в память. getInstance вытягивает всю таблицу при запуске, а затем я проверяю все свои потоковые данные, чтобы увидеть, нужно ли мне снова извлечь из cassandra (что мне придется очень редко). Преобразование и сбор - это то место, где я проверяю, нужно ли мне вставлять новые данные. Но поскольку есть вероятность, что моя таблица будет обновлена, мне придется периодически обновлять трансляцию. Поэтому моя идея состояла в том, чтобы уничтожить его, а затем ретранслировать. Я обновлю это, как только я получу другой материал.

У меня такая же ошибка, если я прокомментирую уничтожение и ретрансляцию.

Еще одно обновление:

Мне нужно получить доступ к переменной широковещательной передачи в processor эту строку: .map(r => processor(r._2, MyCache.cache.value)).

Я могу транслировать переменную в преобразовании, и если я println(MyCache.cache.value) в преобразовании, то все мои тесты пройдены, и я в состоянии получить доступ к трансляции в processor

Update:

rawStream 
    .map(r => { 
     println("$$$$$$$$$$$$$$$$$$$") 
     println(metrics.value) 
     r 
    }) 

Это трассировка стека, которую я получаю, когда она попадает в эту линию.

ERROR org.apache.spark.executor.Executor - Exception in task 0.0 in stage 135.0 (TID 114) 
    java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_1_piece0 of broadcast_1 
     at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1222) 
     at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165) 
     at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) 
     at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) 
     at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88) 
     at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) 
     at com.uptake.readings.ingestion.StreamProcessors$$anonfun$processIncomingKafkaData$4.apply(StreamProcessors.scala:160) 
     at com.uptake.readings.ingestion.StreamProcessors$$anonfun$processIncomingKafkaData$4.apply(StreamProcessors.scala:158) 
     at scala.collection.Iterator$$anon$11.next(Iterator.scala:370) 
     at scala.collection.Iterator$$anon$11.next(Iterator.scala:370) 
     at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:414) 
     at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:284) 
     at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171) 
     at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:268) 
     at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
     at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) 
     at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) 
     at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
     at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
     at org.apache.spark.scheduler.Task.run(Task.scala:89) 
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 
    Caused by: org.apache.spark.SparkException: Failed to get broadcast_1_piece0 of broadcast_1 
     at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138) 
     at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138) 
     at scala.Option.getOrElse(Option.scala:121) 
     at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:137) 
     at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120) 
     at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120) 
     at scala.collection.immutable.List.foreach(List.scala:381) 
     at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:120) 
     at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:175) 
     at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1219) 
     ... 24 more 
+2

В какую строку является ошибка? Это похоже на тот, который вы даже прокомментировали «это, вероятно, неправильно», потому что это первый доступ к «MyCache.cache.value», и он не должен работать. Вызов 'rdd.collect()' inside 'transform' также мне кажется странным. –

+2

Мне кажется, что вы на неправильном пути. Broadcasting предназначен для распространения чего-то статического (как неизменяемая карта) для всех работников для быстрого доступа. По внешнему виду вы пытаетесь _build up_ map, и вы не должны использовать трансляцию для этого. И я согласен с @ Alexey Romanov, что вызов 'rdd.collect' кажется очень странным, так как весь rdd затем обрабатывается на драйвере, а не параллельно, а параллельная обработка - это то, что Spark превосходит ... –

+0

К сожалению, извините Я добавлю обновление к моему вопросу. Здесь я не придавал особого значения. – nickn

ответ

2

[Обновлено ответ]

Вы получаете сообщение об ошибке, потому что код внутри rawStream.map т.е. MyCache.cache.value становится выполнен на одном из исполнителя и там MyCache.cache еще null!

Когда вы сделали MyCache.getInstance, он создал значение MyCache.cache на водителя и транслировать его в порядке.Но вы не имеете в виду один и тот же объект в вашем методе map, поэтому он не передается исполнителям. Вместо этого, поскольку вы прямо ссылаетесь на MyCache, исполнители ссылаются на MyCache.cache на свою собственную копию объекта MyCache, и это, очевидно, равно null.

Вы можете заставить это работать должным образом, предварительно получив экземпляр объекта вещания cache внутри драйвера и используя , что объект на карте. Следующий код должен работать на вас -

val cache = MYCache.getInstance(ssc.sparkContext) 
rawStream.map(r => { 
        println(cache.value) 
        r 
      }) 
+0

К сожалению, я не придавал особого значения, позвольте мне уточнить мой вопрос. – nickn

+0

Думаю, теперь я вижу проблему. (Я новичок в Stackoverflow. Не уверен, что принятая процедура для обновления ответа - но я буду обновлять оригинальный ответ.) –

+0

Прохладный спасибо. Это имеет большой смысл. Но тогда как я могу ссылаться на это в других файлах? Я думал, что добавление его как свойства в этот статический класс будет делать то же самое? Таким образом, мне не нужно передавать его каждой функции, к которой я хочу обратиться. – nickn