2015-01-29 1 views
1

ОБНОВЛЕНИЕ постановки задачиКак включить SQL на SchemaRDD через интерфейс JDBC? (Это возможно?)

Мы используем искру 1.2.0 (Hadoop 2,4). Мы определили SchemaRDD, используя файлы данных в HDFS, и хотели бы включить запрос этих таблиц через HiveServer2. Мы сталкиваемся с исключениями во время выполнения, пытаясь сэкономить AsTable, и хотели бы, чтобы руководство было продолжено.

код Источник:

package foo.bar 

import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.spark.SparkConf 
import org.apache.spark.sql._ 
import org.apache.spark._ 
import org.apache.spark.sql.hive._ 

object HiveDemo { 
    def main(args: Array[String]) { 
    val conf = new SparkConf().setAppName("Demo") 
    val sc = new SparkContext(conf) 

    // sc is an existing SparkContext. 
    val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) 

    // Create an RDD 
    val zipRDD = sc.textFile("/model-inputs/all_zip_state.csv") 

    // The schema is encoded in a string 
    val schemaString = "ODSMEMBERID,ZIPCODE,STATE,TEST_SUPPLIERID,ratio_death_readm_low,ratio_death_readm_high,regions" 

    // Generate the schema based on the string of schema 
    val schema = 
     StructType(
     schemaString.split(",").map(fieldName => StructField(fieldName, StringType, true))) 

    // Convert records of the RDD (zip) to Rows. 
    val rowRDD = zipRDD.map(_.split(",")).map(p => Row(p(0), p(1), p(2), p(3), p(4), p(5), "")) 

    // Apply the schema to the RDD. 
    val zipSchemaRDD = hiveContext.applySchema(rowRDD, schema) 

    // HiveContext's save as Table 
    zipSchemaRDD.saveAsTable("allzipstable") 

    } 
} 

искровым отправить Команда:

./bin/spark-submit --class foo.bar.HiveDemo --master yarn-cluster --jars /usr/lib/hive/lib/hive-metastore.jar,/usr/lib/spark-1.2.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar,/usr/lib/spark-1.2.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar,/usr/lib/spark-1.2.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar --num-executors 3 --driver-memory 4g --executor-memory 2g --executor-cores 1 lib/datapipe_2.10-1.0.jar 10 

Исключение во время выполнения на узле:

15/01/29 22:35:50 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: Unresolved plan found, tree: 
'CreateTableAsSelect None, allzipstable, false, None 
LogicalRDD [ODSMEMBERID#0,ZIPCODE#1,STATE#2,TEST_SUPPLIERID#3,ratio_death_readm_low#4,ratio_death_readm_high#5,regions#6], MappedRDD[3] at map at HiveDemo.scala:30 
) 
Exception in thread "Driver" org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved plan found, tree: 
'CreateTableAsSelect None, allzipstable, false, None 
LogicalRDD [ODSMEMBERID#0,ZIPCODE#1,STATE#2,TEST_SUPPLIERID#3,ratio_death_readm_low#4,ratio_death_readm_high#5,regions#6], MappedRDD[3] at map at HiveDemo.scala:30 

    at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$1.applyOrElse(Analyzer.scala:83) 
    at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$1.applyOrElse(Analyzer.scala:78) 
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144) 
    at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135) 
    at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:78) 
    at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:76) 
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61) 
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59) 
    at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) 
    at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60) 
    at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34) 
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59) 
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51) 
    at scala.collection.immutable.List.foreach(List.scala:318) 
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51) 
    at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:411) 
    at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:411) 
    at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData$lzycompute(SQLContext.scala:412) 
    at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData(SQLContext.scala:412) 
    at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan$lzycompute(SQLContext.scala:413) 
    at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan(SQLContext.scala:413) 
    at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:418) 
    at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:416) 
    at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:422) 
    at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:422) 
    at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425) 
    at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425) 
    at org.apache.spark.sql.SchemaRDDLike$class.saveAsTable(SchemaRDDLike.scala:126) 
    at org.apache.spark.sql.SchemaRDD.saveAsTable(SchemaRDD.scala:108) 
    at com.healthagen.datapipe.ahm.HiveDemo$.main(HiveDemo.scala:36) 
    at com.healthagen.datapipe.ahm.HiveDemo.main(HiveDemo.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:427) 
15/01/29 22:35:50 INFO yarn.ApplicationMaster: Invoking sc stop from shutdown hook 

Еще одна попытка:

package foo.bar 

import org.apache.spark.{ SparkConf, SparkContext } 
import org.apache.spark.sql._ 

case class AllZips(
    ODSMEMBERID: String, 
    ZIPCODE: String, 
    STATE: String, 
    TEST_SUPPLIERID: String, 
    ratio_death_readm_low: String, 
    ratio_death_readm_high: String, 
    regions: String) 

object HiveDemo { 
    def main(args: Array[String]) { 
    val conf = new SparkConf().setAppName("HiveDemo") 
    val sc = new SparkContext(conf) 
    val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) 
    import hiveContext._ 
    val allZips = sc.textFile("/model-inputs/all_zip_state.csv").map(_.split(",")).map(p => AllZips(p(0), p(1), p(2), p(3), p(4), p(5), "")) 
    val allZipsSchemaRDD = createSchemaRDD(allZips) 
    allZipsSchemaRDD.saveAsTable("allzipstable") 
    } 
} 

Исключение на узле:

15/01/30 00:28:19 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: Unresolved plan found, tree: 
'CreateTableAsSelect None, allzipstable, false, None 
LogicalRDD [ODSMEMBERID#0,ZIPCODE#1,STATE#2,TEST_SUPPLIERID#3,ratio_death_readm_low#4,ratio_death_readm_high#5,regions#6], MapPartitionsRDD[4] at mapPartitions at ExistingRDD.scala:36 
) 
Exception in thread "Driver" org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved plan found, tree: 
'CreateTableAsSelect None, allzipstable, false, None 
LogicalRDD [ODSMEMBERID#0,ZIPCODE#1,STATE#2,TEST_SUPPLIERID#3,ratio_death_readm_low#4,ratio_death_readm_high#5,regions#6], MapPartitionsRDD[4] at mapPartitions at ExistingRDD.scala:36 

    at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$1.applyOrElse(Analyzer.scala:83) 
    at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$$anonfun$1.applyOrElse(Analyzer.scala:78) 
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:144) 
    at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:135) 
    at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:78) 
    at org.apache.spark.sql.catalyst.analysis.Analyzer$CheckResolution$.apply(Analyzer.scala:76) 
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:61) 
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1$$anonfun$apply$2.apply(RuleExecutor.scala:59) 
    at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51) 
    at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60) 
    at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:34) 
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:59) 
    at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$apply$1.apply(RuleExecutor.scala:51) 
    at scala.collection.immutable.List.foreach(List.scala:318) 
    at org.apache.spark.sql.catalyst.rules.RuleExecutor.apply(RuleExecutor.scala:51) 
    at org.apache.spark.sql.SQLContext$QueryExecution.analyzed$lzycompute(SQLContext.scala:411) 
    at org.apache.spark.sql.SQLContext$QueryExecution.analyzed(SQLContext.scala:411) 
    at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData$lzycompute(SQLContext.scala:412) 
    at org.apache.spark.sql.SQLContext$QueryExecution.withCachedData(SQLContext.scala:412) 
    at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan$lzycompute(SQLContext.scala:413) 
    at org.apache.spark.sql.SQLContext$QueryExecution.optimizedPlan(SQLContext.scala:413) 
    at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:418) 
    at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:416) 
    at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:422) 
    at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:422) 
    at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425) 
    at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425) 
    at org.apache.spark.sql.SchemaRDDLike$class.saveAsTable(SchemaRDDLike.scala:126) 
    at org.apache.spark.sql.SchemaRDD.saveAsTable(SchemaRDD.scala:108) 
    at com.healthagen.datapipe.ahm.HiveDemo$.main(HiveDemo.scala:22) 
    at com.healthagen.datapipe.ahm.HiveDemo.main(HiveDemo.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:427) 
15/01/30 00:28:19 INFO yarn.ApplicationMaster: Invoking sc stop from shutdown hook 

ответ

0

createSchemaRDD фрагмент кода выше работает отлично на искру 1.2.1

Был дефект CTAS в 1.2.0

+0

что такое дефект CTAS? – marlieg

1

Вы должны использовать HiveContext

Вот Java/Scala документы:

* Note that this currently only works with SchemaRDDs that are created from a HiveContext as 
    * there is no notion of a persisted catalog in a standard SQL context. 


    @Experimental 
    def saveAsTable(tableName: String): Unit = 
    sqlContext.executePlan(CreateTableAsSelect(None, tableName, logicalPlan, false)).toRdd 

Так в ваши изменения кода его :

val sc = new HiveContext(conf) 

На самом деле вы должны переименовать его в

val sqlc = new HiveContext(conf) 

FYI: подробнее о регистрации таблиц (в SQLContext): обратите внимание на таблицы преходящи, если сделать так:

/** 
    * Temporary tables exist only 
    * during the lifetime of this instance of SQLContext. 
    * 
    * @group userf 
    */ 
    def registerRDDAsTable(rdd: SchemaRDD, tableName: String): Unit = { 
    catalog.registerTable(Seq(tableName), rdd.queryExecution.logical) 
    } 

UPDATE Ваш новый StackTrace включает следующую фразу:

Unresolved plan found, tree: 

Это, как правило, означает, что у вас есть колония n, который не соответствует базовой таблице. Я буду смотреть дальше, чтобы увидеть, могу ли я изолировать, но в то же время вы могли бы также рассмотреть с этой точки зрения.

+0

Если вы не готовы наградить ответ еще, пожалуйста, рассмотрите upvote этот ответ как имеющий были полезны. – javadba

+0

совершенно! у вас есть идеи для исключения времени выполнения на узлах? –

+0

добавлен в мой ответ о проверке имен столбцов. – javadba