2014-09-03 5 views
1

Я пытался понять, как искровые потоки и hbase подключаются, но не были успешными. То, что я пытаюсь сделать, это дать искровой поток, обработать этот поток и сохранить результаты в таблице hbase. До сих пор это то, что у меня есть:Spark Streaming в HBase с логикой фильтрации

import org.apache.spark.SparkConf 
import org.apache.spark.streaming.{Seconds, StreamingContext} 
import org.apache.spark.streaming.StreamingContext._ 
import org.apache.spark.storage.StorageLevel 
import org.apache.hadoop.hbase.HBaseConfiguration 
import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get} 
import org.apache.hadoop.hbase.util.Bytes 

def blah(row: Array[String]) { 
    val hConf = new HBaseConfiguration() 
    val hTable = new HTable(hConf, "table") 
    val thePut = new Put(Bytes.toBytes(row(0))) 
    thePut.add(Bytes.toBytes("cf"), Bytes.toBytes(row(0)), Bytes.toBytes(row(0))) 
    hTable.put(thePut) 
} 

val ssc = new StreamingContext(sc, Seconds(1)) 
val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER) 
val words = lines.map(_.split(",")) 
val store = words.foreachRDD(rdd => rdd.foreach(blah)) 
ssc.start() 

В настоящее время я выполняю вышеуказанный код в искровой оболочке. Я не уверен, что я делаю неправильно.
я получаю следующее сообщение об ошибке в оболочке:

14/09/03 16:21:03 ERROR scheduler.JobScheduler: Error running job streaming job 1409786463000 ms.0 

org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext 

at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) 

at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) 

at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) 

at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 

at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) 

at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770) 

at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713) 

at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697) 

at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176) 

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) 

at akka.actor.ActorCell.invoke(ActorCell.scala:456) 

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) 

at akka.dispatch.Mailbox.run(Mailbox.scala:219) 

at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) 

at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 

at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 

at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 

at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

Я также проверил таблицу Hbase, на всякий случай, и ничего нового не написано там.

Я запускаю nc -lk 9999 на другом терминале для подачи данных в искровую оболочку для тестирования.

+0

Не могли бы вы вставить полную дорожку стека? Вы должны уметь получить, какой класс вызывает эту ошибку. – zsxwing

+0

Ни один из классов hbase не сериализуем - убедитесь, что вы не случайно их сериализуете. Я не вижу ничего очевидного в вашем коде, но – David

ответ

1

С помощью пользователей группы пользователей искры я смог выяснить, как заставить это работать. Похоже, что мне нужно, чтобы обернуть мое потоковый, картирование и Еогеасп вызов вокруг сериализуемого объекта:

import org.apache.spark.SparkConf 
import org.apache.spark.streaming.{Seconds, StreamingContext} 
import org.apache.spark.streaming.StreamingContext._ 
import org.apache.spark.storage.StorageLevel 
import org.apache.hadoop.hbase.HBaseConfiguration 
import org.apache.hadoop.hbase.client.{HBaseAdmin,HTable,Put,Get} 
import org.apache.hadoop.hbase.util.Bytes 

object Blaher { 
    def blah(row: Array[String]) { 
    val hConf = new HBaseConfiguration() 
    val hTable = new HTable(hConf, "table") 
    val thePut = new Put(Bytes.toBytes(row(0))) 
    thePut.add(Bytes.toBytes("cf"), Bytes.toBytes(row(0)), Bytes.toBytes(row(0))) 
    hTable.put(thePut) 
    } 
} 

object TheMain extends Serializable{ 
    def run() { 
    val ssc = new StreamingContext(sc, Seconds(1)) 
    val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER) 
    val words = lines.map(_.split(",")) 
    val store = words.foreachRDD(rdd => rdd.foreach(Blaher.blah)) 
    ssc.start() 
    } 
} 

TheMain.run() 
+2

Для разъяснения другим, причина, по которой он был неудачен раньше, был, вероятно, потому, что функция blah присутствовала в основной функции. поэтому, когда blah использовался в rdd.foreach(), закрытие функции blah необходимо было сериализовать, а закрытие включало другие объекты в основную функцию. Поэтому система непреднамеренно пыталась сериализовать ненужные вещи. Перемещение функции blah на другой объект разрешило это, оставив закрытие чистым. –

0

Кажется типичные антипаттерны. См. Главу «Дизайн шаблонов для использования foreachRDD» на странице http://spark.apache.org/docs/latest/streaming-programming-guide.html для правильной картины.

+0

Ответы Link-only не рекомендуется на StackOverflow, так как ссылки могут сломаться, даже если они содержат правильный ответ. Было бы полезно, если бы вы могли отредактировать ответ, чтобы включить соответствующий материал из ссылки. –