2016-10-26 1 views
1

У меня есть много структурированных данных, которые хранятся в очень значимом ключе, и я хочу обработать его в одинаковом смысле полный и эффективный способ.Hadoop/Spark Read Многие файлы CSV

+- some-hdfs-path/ 
    +- level-1_var-01/ 
    | +- level-2_var-001.csv 
    | +- ... 
    | +- level-2_var-nnn.csv 
    +- level-1_var-02/ 
    | +- level-2_other-001.csv 
    | +- ... 
    | +- level-2_other-mmm.csv 
    +- .../
    +- level-1_var-nn/ 
    | +- ... 

Каждый файл имеет размер около 100 МБ и имеет около 1 000 000 строк. Количество файлов, обычно около 100, меняется в каждом каталоге, а также имена файлов. Другими словами, я не знаю, сколько файлов или то, что они называются, но мне нужны их имена и, очевидно, их контент.

У меня возникли проблемы с RDD, с которых я получаю от sc.textFile("/some-hdfs-path/level-1_var-01/*.csv") и sc.wholeTextFiles("/some-hdfs-path/level-1_var-01").

Общая цель - фактически получить первую и последнюю строку из каждого файла в каталогах уровня 1_var /. Объедините результаты для каждого уровня-1_var, затем вернитесь и выпишите весь новый набор файлов для каждого уровня-1_var/в some-other-hdfs-path/level-1-var/

Я новичок в Hadoop/Искра и использование RDD. Я прочитал documentation для вышеупомянутых двух функций, но я все еще запутался в том, как перебирать RDD, которые я получаю и выполняю обработку.

EDIT: файлы содержат данные временных рядов, поэтому объединение содержимого файлов в каждом каталоге нежелательно. Я открыт для добавления содержимого файлов в виде дополнительных столбцов в один гигантский фреймворк данных, но не как строки.

+0

посмотреть на sparkSql или улей для запроса вашей RDD. Его в значительной степени похожи на синтаксис SQL (SELECT * FROM yourRDD LIMIT 1), который получит первую строку. –

+0

, так как я могу выбрать различные файлы, хранящиеся в одном rdd, чтобы запросить их? – Constantine

+0

создайте внешнюю таблицу, где Hive указывает на _some-hdfs-path/_ и используйте 'INPUT__FILE__NAME', чтобы получить фактическое имя файла при запросе данных с помощью SparkSQL или Hive. – cheseaux

ответ

0

Используйте этот код для чтения CSV в pySpark путем замены ваши конфигурации и свойства.

from pyspark.sql import SparkSession 
from pyspark.sql import Row 

def get_first_and_last(filename): 
    #rdd variable holds the content of file(it's distributed) 
    rdd = spark.read.csv(filename, header=True, mode="DROPMALFORMED").rdd 

    #Here filename holds abs path. Feel free to substring as per your needs 
    return Row(filename, rdd.first, rdd.take(rdd.count()).last()) 


spark = SparkSession \ 
    .builder \ 
    .appName("Read CSVs") \ 
    .config("spark.some.config.option", "some-value") \ 
    .getOrCreate() 

# This file list is not distributed one, It holds list of filenames only 
filesList = spark.sparkContext\ 
    .wholeTextFiles("/some-hdfs-path/level-*_var-*/*.csv")\ 
    .map(lambda x: x[0])\ 
    .collect() 

#output array 
records = filesList.map(get_first_and_last) 

for record in records: 
    print(record) 

Я пробовал эквивалентный код в scala, и я могу видеть результаты по мере необходимости.

Редактировать: Добавлен другой подход в соответствии с комментариями.

ПРИМЕЧАНИЕ: Маленькие файлы предпочтительны, когда используется sparkContext.wholeTextFiles(), так как каждый файл будет полностью загружен в память. documentation

records = spark.sparkContext\ 
    .wholeTextFiles("/some-hdfs-path/level-*_var-*/*.csv")\ 
    .map(lambda x : Row(x[0], x[1].split("\\n")[0], x[1].split("\\n")[-1]))\ 

for record in records.collect(): 
    print(record) 

pySpark - SparkSession

+0

Как выглядит структура 'df' в случае нескольких CSV-файлов? – Constantine

+0

Структура 'df' будет зависеть от определения' schema'. 1) Нет никакой разницы при чтении CSV или списка файлов CSV. 2) Но схема должна быть одинаковой для всех CSV-файлов в каталоге, если мы хотим обрабатывать их на растяжке. – mrsrinivas

+0

, поэтому содержимое разных файлов будет объединено в единый кадр данных? – Constantine

0

Вы можете использовать SparkSession объект искры 2.0 и дать каталог в формате CSV следует за

val df =spark.read.csv(pathOfDirectory) 

выше DF будет иметь данные всех CSV-в каталоге

+0

разве это не синтаксис scala? ... и не соответствует ли строка 'some_rdd = sc.wholeTextFiles ("/some-hdfs-path/level-1_var-01 ")'?Я не уверен – Constantine