0

Так что я пытаюсь запустить задание, которое просто запускает запрос против cassandra с использованием spark-sql, задание передается в порядке, и работа начинается нормально. Этот код работает, когда он не запускается при работе с сервером поисковых запросов (при простое искра submit). Может ли кто-нибудь сказать мне, что не так с моим кодом работы или конфигурационными файлами, которые вызывают ошибку ниже?Ошибка при запуске задания, которое запрашивает у Cassandra через Spark SQL через Spark Jobserver

{ 
    "status": "ERROR", 
    "ERROR": { 
    "errorClass": "java.util.concurrent.ExecutionException", 
    "cause": "Failed to open native connection to Cassandra at {127.0.1.1}:9042", 
    "stack": ["com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSes 
sion(CassandraConnector.scala:155)", "com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scal 
a:141)", "com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:141)", "com.datastax.spark 
.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:31)", "com.datastax.spark.connector.cql.RefCountedCache 
.acquire(RefCountedCache.scala:56)", "com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:73) 
", "com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:101)", "com.datastax.spark.connecto 
r.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:112)", "com.datastax.spark.connector.cql.Schema$.fromCassandra(Sch 
ema.scala:243)", "org.apache.spark.sql.cassandra.CassandraCatalog$$anon$1.load(CassandraCatalog.scala:22)", "org.apache.spark.sql. 
cassandra.CassandraCatalog$$anon$1.load(CassandraCatalog.scala:19)", "com.google.common.cache.LocalCache$LoadingValueReference.loa 
dFuture(LocalCache.java:3599)", "com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)", "com.google.common.ca 
che.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)", "com.google.common.cache.LocalCache$Segment.get(LocalCache.java:225 
7)", "com.google.common.cache.LocalCache.get(LocalCache.java:4000)", "com.google.common.cache.LocalCache.getOrLoad(LocalCache.java 
:4004)", "com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)", "org.apache.spark.sql.cassandra.Cassand 
raCatalog.lookupRelation(CassandraCatalog.scala:28)", "org.apache.spark.sql.cassandra.CassandraSQLContext$$anon$2.org$apache$spark 
$sql$catalyst$analysis$OverrideCatalog$$super$lookupRelation(CassandraSQLContext.scala:218)", "org.apache.spark.sql.catalyst.analy 
sis.OverrideCatalog$$anonfun$lookupRelation$3.apply(Catalog.scala:161)", "org.apache.spark.sql.catalyst.analysis.OverrideCatalog$$ 
anonfun$lookupRelation$3.apply(Catalog.scala:161)", "scala.Option.getOrElse(Option.scala:120)", "org.apache.spark.sql.catalyst.ana 
lysis.OverrideCatalog$class.lookupRelation(Catalog.scala:161)", "org.apache.spark.sql.cassandra.CassandraSQLContext$$anon$2.lookup 
Relation(CassandraSQLContext.scala:218)", "org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.getTable(Analyzer.sca 
la:174)", "org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$6.applyOrElse(Analyzer.scala:186)", "or 
g.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$6.applyOrElse(Analyzer.scala:181)", "org.apache.spar 
k.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:188)", "org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.appl 
y(TreeNode.scala:188)", "org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:51)", "org.apache.spark.sql. 
catalyst.trees.TreeNode.transformDown(TreeNode.scala:187)", "org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNod 
e.scala:208)", "scala.collection.Iterator$$anon$11.next(Iterator.scala:328)", "scala.collection.Iterator$class.foreach(Iterator.sc 
ala:727)", "scala.collection.AbstractIterator.foreach(Iterator.scala:1157)", "scala.collection.generic.Growable$class.$plus$plus$e 
q(Growable.scala:48)", "scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)", "scala.collection.mutable.Arra 
yBuffer.$plus$plus$eq(ArrayBuffer.scala:47)", "scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)", "scala.colle 
ction.AbstractIterator.to(Iterator.scala:1157)", "scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)", "sc 
ala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)", "scala.collection.TraversableOnce$class.toArray(TraversableOnce.sc 
ala:252)", "scala.collection.AbstractIterator.toArray(Iterator.scala:1157)", "org.apache.spark.sql.catalyst.trees.TreeNode.transfo 
rmChildrenDown(TreeNode.scala:238)", "org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:193)", "org.apache 
.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:178)", "org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelatio 
ns$.apply(Analyzer.scala:181)", "org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:171)", "or 
g.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61)", "org.apache.spark. 
sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59)", "scala.collection.LinearSeqOptimi 
zed$class.foldLeft(LinearSeqOptimized.scala:111)", "scala.collection.immutable.List.foldLeft(List.scala:84)", "org.apache.spark.sq 
l.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59)", "org.apache.spark.sql.catalyst.rules.RuleExecutor$$a 
nonfun$apply$1.apply(RuleExecutor.scala:51)", "scala.collection.immutable.List.foreach(List.scala:318)", "org.apache.spark.sql.cat 
alyst.rules.RuleExecutor.apply(RuleExecutor.scala:51)", "org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLCon 
text.scala:1082)", "org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:1082)", "org.apache.spark.sql.SQLCont 
ext$QueryExecution.assertAnalyzed(SQLContext.scala:1080)", "org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133)", "org.apac 
he.spark.sql.cassandra.CassandraSQLContext.cassandraSql(CassandraSQLContext.scala:211)", "org.apache.spark.sql.cassandra.Cassandra 
SQLContext.sql(CassandraSQLContext.scala:214)", "CassSparkTest$.runJob(CassSparkTest.scala:23)", "CassSparkTest$.runJob(CassSparkT 
est.scala:9)", "spark.jobserver.JobManagerActor$$anonfun$spark$jobserver$JobManagerActor$$getJobFuture$4.apply(JobManagerActor.sca 
la:235)", "scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)", "scala.concurrent.impl.Future$P 
romiseCompletingRunnable.run(Future.scala:24)", "java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)", 
"java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)", "java.lang.Thread.run(Thread.java:745)"], 
    "causingClass": "java.io.IOException", 
    "message": "java.io.IOException: Failed to open native connection to Cassandra at {127.0.1.1}:9042" 
    } 
} 

Вот работа, которую я бегу:

import org.apache.spark.{SparkContext, SparkConf} 
import com.datastax.spark.connector._ 
import org.apache.spark.sql.cassandra.CassandraSQLContext 
import org.apache.spark.sql._ 
import spark.jobserver._ 
import com.typesafe.config.Config 
import com.typesafe.config.ConfigFactory 

object CassSparkTest extends SparkJob { 
     def main(args: Array[String]) { 

       val sc = new SparkContext("spark://192.168.10.11:7077", "test") 
       val config = ConfigFactory.parseString("") 
       val results = runJob(sc, config) 
       println("Results:" + results) 
     } 
     override def validate(sc:SparkContext, config: Config): SparkJobValidation = { 
       SparkJobValid 
     } 

     override def runJob(sc:SparkContext, config: Config): Any = { 
       val sqlC = new CassandraSQLContext(sc) 
       val df = sqlC.sql(config.getString("input.sql")) 
       df.collect() 
     } 

} 

и вот мой конфигурационный файл для искрового jobserver

# Template for a Spark Job Server configuration file 
# When deployed these settings are loaded when job server starts 
# 
# Spark Cluster/Job Server configuration 
spark { 
    # spark.master will be passed to each job's JobContext 
    master = "spark://192.168.10.11:7077" 
    # master = "mesos://vm28-hulk-pub:5050" 
    # master = "yarn-client" 

    # Default # of CPUs for jobs to use for Spark standalone cluster 
    job-number-cpus = 1 

    jobserver { 
    port = 2020 
    jar-store-rootdir = /tmp/jobserver/jars 

    jobdao = spark.jobserver.io.JobFileDAO 

    filedao { 
     rootdir = /tmp/spark-job-server/filedao/data 
    } 
    } 

    # predefined Spark contexts 
    # contexts { 
    # my-low-latency-context { 
    #  num-cpu-cores = 1   # Number of cores to allocate. Required. 
    #  memory-per-node = 512m   # Executor memory per node, -Xmx style eg 512m, 1G, etc. 
    # } 
    # # define additional contexts here 
    # } 

    # universal context configuration. These settings can be overridden, see README.md 
    context-settings { 
    num-cpu-cores = 1   # Number of cores to allocate. Required. 
    memory-per-node = 512m   # Executor memory per node, -Xmx style eg 512m, #1G, etc. 

    # in case spark distribution should be accessed from HDFS (as opposed to being installed on every mesos slave) 
    # spark.executor.uri = "hdfs://namenode:8020/apps/spark/spark.tgz" 
    spark-cassandra-connection-host="127.0.0.1" 
    # uris of jars to be loaded into the classpath for this context. Uris is a string list, or a string separated by commas ',' 
    # dependent-jar-uris = ["file:///some/path/present/in/each/mesos/slave/somepackage.jar"] 

    dependent-jar-uris = ["file:///home/vagrant/lib/spark-cassandra-connector-assembly-1.3.0-M2-SNAPSHOT.jar"] 

    # If you wish to pass any settings directly to the sparkConf as-is, add them here in passthrough, 
    # such as hadoop connection settings that don't use the "spark." prefix 
    passthrough { 
     #es.nodes = "192.1.1.1" 

    } 
    } 

    # This needs to match SPARK_HOME for cluster SparkContexts to be created successfully 
    # home = "/home/spark/spark" 
} 

# Note that you can use this file to define settings not only for job server, 
# but for your Spark jobs as well. Spark job configuration merges with this configuration file as defaults. 

ответ

3

@vicg, сначала нужно spark.cassandra.connection .host - периоды не тире. Также обратите внимание на ошибку, как IP-адрес «127.0.1.1», а не тот, что указан в конфиге. Вы также можете передать IP при создании контекста, например:

локон -X POST 'локальный: 8090/контексты/мой-контекст spark.cassandra.connection.host = 127.0.0.1'

Если вышеуказанное не работает, попробуйте следующее PR: https://github.com/spark-jobserver/spark-jobserver/pull/164

+0

Это сработало. Вручение контекста - это то, что он сделал. Когда я попытался добавить «spark.cassandra.connection.host = 127.0.0.1» в мои контекстные настройки, это не сработало. – vicg