Я использую песочницу hortonworks в Azure с искрой 1.6. У меня есть база данных Hive, заполненная данными выборок TCP-DS. Я хочу прочитать некоторые SQL-запросы из внешних файлов и запустить их в наборе данных улья в иске. Я следую этой теме Using hive database in spark, которая просто использует таблицу в моем наборе данных, а также снова записывает SQL-запрос в искру, но мне нужно определить целое, набор данных в качестве источника для запроса на это, я думаю, что я должен использовать dataframes, но я не уверен и не знаю, как! также хочу импортировать SQL-запрос из внешнего .sql-файла и не записывать запрос еще раз! не могли бы вы направить меня, как я могу это сделать? большое спасибо, bests!Как использовать всю базу данных улья в искровом режиме и читать sql-запросы из внешних файлов?
ответ
Spark Может считывать данные непосредственно из таблицы Hive. Вы можете создавать, перетаскивать таблицу Hive с помощью Spark, и даже вы можете выполнять все связанные с Hive hql операции через Spark. Для этого нужно использовать искру HiveContext
Из документации Spark:
Спарк HiveContext, обеспечивает супернабор функциональных возможностей, предоставляемых базовой SQLContext. Дополнительные функции включают возможность записи запросов с использованием более полного анализатора HiveQL, доступа к UUF Hive и возможности чтения данных из таблиц Hive. Чтобы использовать HiveContext, вам не нужно иметь существующую настройку Hive.
Для получения дополнительной информации вы можете посетить Spark Documentation
Чтобы избежать написания SQL в коде, вы можете использовать файл свойств, где вы можете поместить все ваши улей запрос, а затем вы можете использовать ключ в вас код.
См. Ниже реализацию Spark HiveContext и использование файла свойств в Spark Scala.
package com.spark.hive.poc
import org.apache.spark._
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql._
import org.apache.spark._
import org.apache.spark.sql.DataFrame;
import org.apache.spark.rdd.RDD;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.sql.hive.HiveContext;
//Import Row.
import org.apache.spark.sql.Row;
//Import Spark SQL data types
import org.apache.spark.sql.types.{ StructType, StructField, StringType };
object ReadPropertyFiles extends Serializable {
val conf = new SparkConf().setAppName("read local file");
conf.set("spark.executor.memory", "100M");
conf.setMaster("local");
val sc = new SparkContext(conf)
val sqlContext = new HiveContext(sc)
def main(args: Array[String]): Unit = {
var hadoopConf = new org.apache.hadoop.conf.Configuration();
var fileSystem = FileSystem.get(hadoopConf);
var Path = new Path(args(0));
val inputStream = fileSystem.open(Path);
var Properties = new java.util.Properties;
Properties.load(inputStream);
//Create an RDD
val people = sc.textFile("/user/User1/spark_hive_poc/input/");
//The schema is encoded in a string
val schemaString = "name address";
//Generate the schema based on the string of schema
val schema =
StructType(
schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)));
//Convert records of the RDD (people) to Rows.
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim));
//Apply the schema to the RDD.
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema);
peopleDataFrame.printSchema();
peopleDataFrame.registerTempTable("tbl_temp")
val data = sqlContext.sql(Properties.getProperty("temp_table"));
//Drop Hive table
sqlContext.sql(Properties.getProperty("drop_hive_table"));
//Create Hive table
sqlContext.sql(Properties.getProperty("create_hive_tavle"));
//Insert data into Hive table
sqlContext.sql(Properties.getProperty("insert_into_hive_table"));
//Select Data into Hive table
sqlContext.sql(Properties.getProperty("select_from_hive")).show();
sc.stop
}
}
запись в Свойства файла:
temp_table=select * from tbl_temp
drop_hive_table=DROP TABLE IF EXISTS default.test_hive_tbl
create_hive_tavle=CREATE TABLE IF NOT EXISTS default.test_hive_tbl(name string, city string) STORED AS ORC
insert_into_hive_table=insert overwrite table default.test_hive_tbl select * from tbl_temp
select_from_hive=select * from default.test_hive_tbl
Спарк отправить команду, чтобы запустить эту работу:
[[email protected] ~]$ spark-submit --num-executors 1 \
--executor-memory 100M --total-executor-cores 2 --master local \
--class com.spark.hive.poc.ReadPropertyFiles Hive-0.0.1-SNAPSHOT-jar-with-dependencies.jar \
/user/User1/spark_hive_poc/properties/sql.properties
Примечание: Свойство расположение файла должно быть HDFS место.
Спасибо Дорогой @ sandeep-singh. на самом деле у меня есть две разные базы данных (как с одинаковыми таблицами) в улье. Я немного смущен, как выбрать мою предпочтительную базу данных? ps, в Hive Я использовал вот так: используйте DB1, но я не знаю, как это сделать здесь. спасибо –
В запросе я использовал 'default.test_hive_tbl', где' default' - моя база данных, а 'test_hive_tb' - мое имя таблицы. в вашем запросе вы также свяжете свое имя_базы с табличным именем. –
Вам не нужно писать 'use
Вы можете прочитать свой запрос из файлов свойств. –