2017-02-18 31 views
12

Я пытаюсь сохранить DataFrame в HDFS в формате паркета с использованием DataFrameWriter, распределяли тремя значениями столбцов, например:Как разбить и написать DataFrame в Spark без удаления разделов без новых данных?

dataFrame.write.mode(SaveMode.Overwrite).partitionBy("eventdate", "hour", "processtime").parquet(path) 

Как уже упоминалось в this question, partitionBy удалит всю существующую иерархию разделов в path и заменили их на разделы в dataFrame. Поскольку новые инкрементные данные для определенного дня будут поступать периодически, я хочу заменить только те разделы в иерархии, для которых есть данные dataFrame, оставив остальные нетронутыми.

Для этого появляется мне нужно сохранить каждый раздел по отдельности, используя полный путь, что-то вроде этого:

singlePartition.write.mode(SaveMode.Overwrite).parquet(path + "/eventdate=2017-01-01/hour=0/processtime=1234567890") 

Однако у меня возникают проблемы с пониманием, что лучший способ организации данных в монопольном раздела DataFrame s, чтобы я мог написать их, используя их полный путь. Одна из идей было что-то вроде:

dataFrame.repartition("eventdate", "hour", "processtime").foreachPartition ... 

Но foreachPartition работает на Iterator[Row], который не является идеальным для выписывания в формате Паркетная.

Я также рассмотрел использование select...distinct eventdate, hour, processtime для получения списка разделов, а затем фильтрацию исходного фрейма данных каждым из этих разделов и сохранение результатов до их полного секционированного пути. Но отдельный запрос и фильтр для каждого раздела не кажутся очень эффективными, поскольку было бы много операций фильтрации/записи.

Я надеюсь, что есть более чистый способ сохранить существующие разделы, для которых dataFrame нет данных?

Спасибо за чтение.

версия Spark: 2,1

ответ

0

Вы можете режим как Append попробовать.

dataFrame.write.format("parquet") 
.mode("append") 
.partitionBy("year","month") 
.option("path",s"$path/table_name") 
.saveAsTable(s"stg_table_name") 
1

Опция режима Append имеет улов!

df.write.partitionBy("y","m","d") 
.mode(SaveMode.Append) 
.parquet("/data/hive/warehouse/mydbname.db/" + tableName) 

Я протестировал и увидел, что это сохранит существующие файлы разделов. Однако на этот раз проблема следующая: если вы дважды запустите тот же код (с теми же данными), он создаст новые паркетные файлы вместо замены существующих для тех же данных (Spark 1.6). Таким образом, вместо использования Append, мы все еще можем решить эту проблему с помощью Overwrite. Вместо перезаписи на уровне таблицы мы должны перезаписать на уровне раздела.

df.write.mode(SaveMode.Overwrite) 
.parquet("/data/hive/warehouse/mydbname.db/" + tableName + "/y=" + year + "/m=" + month + "/d=" + day) 

Смотрите следующую ссылку для получения дополнительной информации:

Overwrite specific partitions in spark dataframe write method

(Я обновил свой ответ после комментария suriyanto в Thnx.).

+0

ли проверить вас, если, когда вы пишете то же самое данных дважды, что он заменяет старый раздел? Из моего теста он фактически создает новый файл паркета внутри каталога раздела, в результате чего данные удваиваются. Я на Spark 2.2. – suriyanto