1

У меня есть искробезопасный код, который работает в клиентском режиме: он считывает данные с kafka, выполняет некоторую обработку и использует разъем spark-cassandra для вставки данных в cassandra.Исправленная потоковая передача не вставляет данные в Cassandra

Когда я использую «--deploy-режим кластера», данные не вставляются, и я получаю следующее сообщение об ошибке:

Exception in thread "streaming-job-executor-53" java.lang.NoClassDefFoundError: com/datastax/spark/connector/ColumnSelector at com.enerbyte.spark.jobs.wattiopipeline.WattiopipelineStreamingJob$$anonfun$main$2.apply(WattiopipelineStreamingJob.scala:94) at com.enerbyte.spark.jobs.wattiopipeline.WattiopipelineStreamingJob$$anonfun$main$2.apply(WattiopipelineStreamingJob.scala:88) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50) at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.ClassNotFoundException: com.datastax.spark.connector.ColumnSelector at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

я добавил к разъему Зависимость от так:

"com.datastax.spark" %% "spark-cassandra-connector" % "1.5.0" % "provided"

Это мой код приложения:

val measurements = KafkaUtils.createDirectStream[ 
    Array[Byte], 
    Array[Byte], 
    DefaultDecoder, 
    DefaultDecoder](ssc, kafkaConfig, Set("wattio" 
)) 
    .map { 
    case (k, v) => { 
     val decoder = new AvroDecoder[WattioMeasure](null, 
     WattioMeasure.SCHEMA$) 
     decoder.fromBytes(v) 
    } 
    } 

//inserting into WattioRaw 
WattioFunctions.run(WattioFunctions. 
    processWattioRaw(measurements))(
    (rdd: RDD[ 
    WattioTenantRaw], t: Time) => { 
    rdd.cache() 
    //get all the different tenants 
    val differentTenants = rdd.map(a 
    => a.tenant).distinct().collect() 
    // for each tenant, create keyspace value and flush to cassandra 
    differentTenants.foreach(tenant => { 
     val keyspace = tenant + "_readings" 
     rdd.filter(a => a.tenant == tenant).map(s => s.wattioRaw).saveToCassandra(keyspace, "wattio_raw") 
    }) 
    rdd.unpersist(true) 
    } 
) 

ssc.checkpoint("/tmp") 
ssc.start() 
ssc.awaitTermination() 
+0

Как вы определяете зависимость коннектора во время выполнения? Какова ваша полная команда запуска? – RussS

ответ

1

Вы должны убедиться, что ваш JAR доступен для рабочих. Исходный мастер откроет файловый сервер после запуска задания.

Вам необходимо указать путь к вашей uber jar либо с помощью SparkContext.setJars, либо через --jars флаг, перейденный в .

From the documentation

When using spark-submit, the application jar along with any jars included with the --jars option will be automatically transferred to the cluster. Spark uses the following URL scheme to allow different strategies for disseminating jars

0

На самом деле я решил его удаление «при условии» в списке зависимостей, так что SBT упакована искровой-CASSANDRA-разъем к моим сборкам банке.

Интересно то, что в моем запуске сценарии, даже когда я судимый использовать

spark-submit --repositories "location of my artifactory repository" --packages "spark-cassandra-connector"

или

spark-submit --jars "spark-cassandra-connector.jar"

оба не удались!

0

Предоставлено означает, что вы ожидаете, что JDK или контейнер должны обеспечить зависимость во время выполнения, и эта конкретная баночка-зависимость не будет частью вашего окончательного приложения War/jar, созданного вами, поэтому эта ошибка.