Я пытаюсь сохранить DataFrame
в HDFS в формате паркета с использованием DataFrameWriter
, распределяли тремя значениями столбцов, например:Как разбить и написать DataFrame в Spark без удаления разделов без новых данных?
dataFrame.write.mode(SaveMode.Overwrite).partitionBy("eventdate", "hour", "processtime").parquet(path)
Как уже упоминалось в this question, partitionBy
удалит всю существующую иерархию разделов в path
и заменили их на разделы в dataFrame
. Поскольку новые инкрементные данные для определенного дня будут поступать периодически, я хочу заменить только те разделы в иерархии, для которых есть данные dataFrame
, оставив остальные нетронутыми.
Для этого появляется мне нужно сохранить каждый раздел по отдельности, используя полный путь, что-то вроде этого:
singlePartition.write.mode(SaveMode.Overwrite).parquet(path + "/eventdate=2017-01-01/hour=0/processtime=1234567890")
Однако у меня возникают проблемы с пониманием, что лучший способ организации данных в монопольном раздела DataFrame
s, чтобы я мог написать их, используя их полный путь. Одна из идей было что-то вроде:
dataFrame.repartition("eventdate", "hour", "processtime").foreachPartition ...
Но foreachPartition
работает на Iterator[Row]
, который не является идеальным для выписывания в формате Паркетная.
Я также рассмотрел использование select...distinct eventdate, hour, processtime
для получения списка разделов, а затем фильтрацию исходного фрейма данных каждым из этих разделов и сохранение результатов до их полного секционированного пути. Но отдельный запрос и фильтр для каждого раздела не кажутся очень эффективными, поскольку было бы много операций фильтрации/записи.
Я надеюсь, что есть более чистый способ сохранить существующие разделы, для которых dataFrame
нет данных?
Спасибо за чтение.
версия Spark: 2,1
ли проверить вас, если, когда вы пишете то же самое данных дважды, что он заменяет старый раздел? Из моего теста он фактически создает новый файл паркета внутри каталога раздела, в результате чего данные удваиваются. Я на Spark 2.2. – suriyanto