У меня возникают проблемы с доступом к переменной изнутри функции преобразования. Может ли кто-нибудь помочь мне? Вот мои соответствующие классы и функции.Невозможно получить доступ к широковещательной переменной при трансформации
@SerialVersionUID(889949215L)
object MyCache extends Serializable {
@transient lazy val logger = Logger(getClass.getName)
@volatile var cache: Broadcast[Map[UUID, Definition]] = null
def getInstance(sparkContext: SparkContext) : Broadcast[Map[UUID, Definition]] = {
if (cache == null) {
synchronized {
val map = sparkContext.cassandraTable("keyspace", "table")
.collect()
.map(m => m.getUUID("id") ->
Definition(m.getString("c1"), m.getString("c2"), m.getString("c3"),
m.getString("c4"))).toMap
cache = sparkContext.broadcast(map)
}
}
cache
}
}
В другом файле:
object Processor extends Serializable {
@transient lazy val logger = Logger(getClass.getName)
def processData[T: ClassTag](rawStream: DStream[(String, String)], ssc: StreamingContext,
processor: (String, Broadcast[Map[UUID, Definition]]) => T): DStream[T] = {
MYCache.getInstance(ssc.sparkContext)
var newCacheValues = Map[UUID, Definition]()
rawStream.cache()
rawStream
.transform(rdd => {
val array = rdd.collect()
array.foreach(r => {
val value = getNewCacheValue(r._2, rdd.context)
if (value.isDefined) {
newCacheValues = newCacheValues + value.get
}
})
rdd
})
if (newCacheValues.nonEmpty) {
logger.info(s"Rebroadcasting. There are ${newCacheValues.size} new values")
logger.info("Destroying old cache")
MyCache.cache.destroy()
// this is probably wrong here, destroying object, but then referencing it. But I haven't gotten to this part yet.
MyCache.cache = ssc.sparkContext.broadcast(MyCache.cache.value ++ newCacheValues)
}
rawStream
.map(r => {
println("######################")
println(MyCache.cache.value)
r
})
.map(r => processor(r._2, MyCache.cache.value))
.filter(r => null != r)
}
}
Каждый раз, когда я запускаю это я получаю SparkException: Failed to get broadcast_1_piece0 of broadcast_1
при попытке получить доступ к cache.value
Когда я добавить println(MyCache.cache.values)
сразу после .getInstance
я способный получить доступ к широковещательной переменной, но когда я развертываю его в кластере mesos, я не могу снова получить доступ к значениям широковещания, но с исключением с нулевым указателем.
Update:
ошибка, что я вижу на println(MyCache.cache.value)
. Я не должен был добавлять эту инструкцию if, содержащую уничтожение, потому что мои тесты никогда не ударяют об этом.
Основы моего приложения: у меня есть таблица в cassandra, которая не будет обновляться очень сильно. Но мне нужно сделать некоторые проверки на некоторые потоковые данные. Поэтому я хочу вытащить все данные из этой таблицы, которые не обновляются много, в память. getInstance
вытягивает всю таблицу при запуске, а затем я проверяю все свои потоковые данные, чтобы увидеть, нужно ли мне снова извлечь из cassandra (что мне придется очень редко). Преобразование и сбор - это то место, где я проверяю, нужно ли мне вставлять новые данные. Но поскольку есть вероятность, что моя таблица будет обновлена, мне придется периодически обновлять трансляцию. Поэтому моя идея состояла в том, чтобы уничтожить его, а затем ретранслировать. Я обновлю это, как только я получу другой материал.
У меня такая же ошибка, если я прокомментирую уничтожение и ретрансляцию.
Еще одно обновление:
Мне нужно получить доступ к переменной широковещательной передачи в processor
эту строку: .map(r => processor(r._2, MyCache.cache.value))
.
Я могу транслировать переменную в преобразовании, и если я println(MyCache.cache.value)
в преобразовании, то все мои тесты пройдены, и я в состоянии получить доступ к трансляции в processor
Update:
rawStream
.map(r => {
println("$$$$$$$$$$$$$$$$$$$")
println(metrics.value)
r
})
Это трассировка стека, которую я получаю, когда она попадает в эту линию.
ERROR org.apache.spark.executor.Executor - Exception in task 0.0 in stage 135.0 (TID 114)
java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_1_piece0 of broadcast_1
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1222)
at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at com.uptake.readings.ingestion.StreamProcessors$$anonfun$processIncomingKafkaData$4.apply(StreamProcessors.scala:160)
at com.uptake.readings.ingestion.StreamProcessors$$anonfun$processIncomingKafkaData$4.apply(StreamProcessors.scala:158)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:370)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:414)
at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:284)
at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Failed to get broadcast_1_piece0 of broadcast_1
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:137)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:120)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:175)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1219)
... 24 more
В какую строку является ошибка? Это похоже на тот, который вы даже прокомментировали «это, вероятно, неправильно», потому что это первый доступ к «MyCache.cache.value», и он не должен работать. Вызов 'rdd.collect()' inside 'transform' также мне кажется странным. –
Мне кажется, что вы на неправильном пути. Broadcasting предназначен для распространения чего-то статического (как неизменяемая карта) для всех работников для быстрого доступа. По внешнему виду вы пытаетесь _build up_ map, и вы не должны использовать трансляцию для этого. И я согласен с @ Alexey Romanov, что вызов 'rdd.collect' кажется очень странным, так как весь rdd затем обрабатывается на драйвере, а не параллельно, а параллельная обработка - это то, что Spark превосходит ... –
К сожалению, извините Я добавлю обновление к моему вопросу. Здесь я не придавал особого значения. – nickn