2016-11-15 5 views
2

Я пытаюсь использовать Spark для преобразования кучи файлов csv в паркет, с интересным случаем, что входные файлы csv уже «разделены» по каталогам. Все входные файлы имеют одинаковый набор столбцов. Структура входных файлов выглядит следующим образом:Преобразование CSV в паркет с использованием Spark, сохранение разделов

/path/dir1/file1.csv 
/path/dir1/file2.csv 
/path/dir2/file3.csv 
/path/dir3/file4.csv 
/path/dir3/file5.csv 
/path/dir3/file6.csv 

Я хотел бы, чтобы прочитать эти файлы с искрой и записывать свои данные в таблицу паркетной в HDFS, сохраняя разбиение (разделенное на входе каталог), и такие, как есть представляет собой один выходной файл для каждого раздела. Выходные файлы strucutre должны выглядеть следующим образом:

hdfs://path/dir=dir1/part-r-xxx.gz.parquet 
hdfs://path/dir=dir2/part-r-yyy.gz.parquet 
hdfs://path/dir=dir3/part-r-zzz.gz.parquet 

Лучшее решение, которое я нашел до сих пор является петлей среди входных каталогов, загрузка файлов CSV в dataframe и написать dataframe в целевом разделе в паркет. Но это неэффективно, так как я хочу один выходной файл на раздел, запись в hdfs - это одна задача, которая блокирует цикл. Интересно, как достичь этого с максимальным параллелизмом (и без перетасовки данных в кластере).

Спасибо!

ответ

0

Переименуйте свои входные каталоги, изменив dirX на dir=dirX. Затем выполните:

spark.read.csv('/path/').coalesce(1).write.partitionBy('dir').parquet('output') 

Если вы не можете переименовать каталоги, вы можете использовать Hive Metastore. Создайте внешнюю таблицу и один раздел для каждого каталога. Затем загрузите эту таблицу и перепишите с использованием вышеприведенного шаблона.

+0

Это искра 2.0, правильно? Я все еще на 1.6, и я использую считыватель csv databricks. Вы уверены, что этот метод сделает так, что все файлы csv из одного и того же каталога ввода будут загружены одним и тем же узлом? В противном случае coalesce создаст перетасовку. – benoitdr

+0

Да, с 1.X вам нужно загрузить пакет spark-csv, и он тоже должен работать. – Mariusz

+0

Он работает с пакетом spark-csv, но он создает перетасовку, поскольку он не обеспечивает локальность данных во входных каталогах. – benoitdr

0

Лучшее решение, которое я не нашел до сих пор (не перетасовки и так много потоков в качестве входных директорий):

  • Создание РДУ входных директорий, с таким количеством перегородок в качестве входных директорий

  • Преобразовать его в РДУ входных файлов (сохраняющие разделы, директории)

  • плоскопанельной на карту с пользовательской CSV парсером

  • Преобразовать РДД в dataframe

  • Написать dataframe в паркет таблицу, разделенную по

    директорий

Он требует, чтобы написать свой собственный парсер. Я не мог найти решение для сохранения разбиения с использованием sc.textfile или парсера csv databricks.