У меня есть RDD, элементы которого имеют тип (Long, String). По какой-то причине я хочу сохранить весь RDD в HDFS, а затем снова прочитать RDD в программе Spark. Можно ли это сделать? И если да, то как?Как сохранить RDD в HDFS, а затем прочитать его обратно?
ответ
Это возможно.
В RDD есть saveAsObjectFile
и saveAsTextFile
функции. Кортежи хранятся как (value1, value2)
, так что вы можете позже разобрать его.
Чтение может быть сделано с textFile
функцией от SparkContext, а затем .map
устранить ()
Итак: Версия 1:
rdd.saveAsTextFile ("hdfs:///test1/");
// later, in other program
val newRdds = sparkContext.textFile("hdfs:///test1/part-*").map (x => {
// here remove() and parse long/strings
})
Версия 2:
rdd.saveAsObjectFile ("hdfs:///test1/");
// later, in other program - watch, you have tuples out of the box :)
val newRdds = sparkContext.sc.sequenceFile("hdfs:///test1/part-*", classOf[Long], classOf[String])
Я бы рекомендовал используйте DataFrame, если ваш RDD находится в табличном формате. кадр данных представляет собой таблицу или двухмерную структуру, подобную массиву, в которой каждый столбец содержит измерения для одной переменной, и каждая строка содержит один случай. DataFrame имеет дополнительные метаданные из-за его табличного формата, что позволяет Spark выполнять определенные оптимизации по завершенному запросу. , где RDD - это гибкий распределенный набор данных, который представляет собой большую часть абстракции черных ящиков или ячеек данных, которые невозможно оптимизировать. Однако вы можете перейти от DataFrame к RDD и наоборот, и вы можете перейти от RDD к DataFrame (если RDD находится в табличном формате) с помощью метода toDF.
Ниже приведен пример для создания/хранить DataFrame в CSV и паркета в формате HDFS,
val conf = {
new SparkConf()
.setAppName("Spark-HDFS-Read-Write")
}
val sqlContext = new SQLContext(sc)
val sc = new SparkContext(conf)
val hdfs = "hdfs:///"
val df = Seq((1, "Name1")).toDF("id", "name")
// Writing file in CSV format
df.write.format("com.databricks.spark.csv").mode("overwrite").save(hdfs + "user/hdfs/employee/details.csv")
// Writing file in PARQUET format
df.write.format("parquet").mode("overwrite").save(hdfs + "user/hdfs/employee/details")
// Reading CSV files from HDFS
val dfIncsv = sqlContext.read.format("com.databricks.spark.csv").option("inferSchema", "true").load(hdfs + "user/hdfs/employee/details.csv")
// Reading PQRQUET files from HDFS
val dfInParquet = sqlContext.read.parquet(hdfs + "user/hdfs/employee/details")
VOW, то есть в чистом виде раствора :). Но как мы читаем, используя textFile, поскольку saveAsText создавал много разных файлов. – pythonic
@pythonic См. Мое обновление - вы можете прочитать диапазон файлов. Каждая часть RDD сохраняется в файле 'part-XYZŹŻ', поэтому мы можем читать только каждый файл такого имени –