2017-01-10 8 views
0

У меня есть некоторые ежедневные данные для сохранения в несколько папок (в основном основанных на времени). теперь у меня есть два формата для хранения файлов, один из которых - паркет, а другой - csv, я хотел бы сохранить формат паркета, чтобы сэкономить некоторое пространство. структура папки, как следующее:как исправить 2.0 чтение mutli папок паркет как csv

[[email protected] raw]# tree 
. 
├── entityid=10001 
│   └── year=2017 
│    └── quarter=1 
│     └── month=1 
│      ├── day=6 
│      │   └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet 
│      └── day=7 
│       └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet 
├── entityid=100055 
│   └── year=2017 
│    └── quarter=1 
│     └── month=1 
│      ├── day=6 
│      │   └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet 
│      └── day=7 
│       └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet 
├── entityid=100082 
│   └── year=2017 
│    └── quarter=1 
│     └── month=1 
│      ├── day=6 
│      │   └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet 
│      └── day=7 
│       └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet 
└── entityid=10012 
    └── year=2017 
     └── quarter=1 
      └── month=1 
       ├── day=6 
       │   └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet 
       └── day=7 
        └── part-r-00000-84f964ec-f3ea-46fd-9fe6-8b36c2433e8e.snappy.parquet 

теперь у меня есть питон список хранит все папки должны быть считаны, предположим, что каждый раз запускать его нужно читать только некоторые из базовых папок на условиях фильтра.

folderList=df_inc.collect() 
folderString=[] 

for x in folderList: 
    folderString.append(x.folders) 
In [44]: folderString 
Out[44]: 
[u'/data/raw/entityid=100055/year=2017/quarter=1/month=1/day=7', 
u'/data/raw/entityid=10012/year=2017/quarter=1/month=1/day=6', 
u'/data/raw/entityid=100082/year=2017/quarter=1/month=1/day=7', 
u'/data/raw/entityid=100055/year=2017/quarter=1/month=1/day=6', 
u'/data/raw/entityid=100082/year=2017/quarter=1/month=1/day=6', 
u'/data/raw/entityid=10012/year=2017/quarter=1/month=1/day=7'] 

файлы были прописаны по:

df_join_with_time.coalesce(1).write.partitionBy("entityid","year","quarter","month","day").mode("append").parquet(rawFolderPrefix) 

когда я пытаюсь прочитать папки, хранящиеся в folderString по df_batch=spark.read.parquet(folderString) ошибке java.lang.ClassCastException: java.util.ArrayList не может быть приведен к Java. lang.String.

Если я сохраню файлы в формате csv и прочитал его через код, он просто отлично работает, как показано ниже: пожалуйста, если в любом случае прочитать список файлов для палитры паркета, оцените!

In [46]: folderList=df_inc.collect() 
    ...: folderString=[] 
    ...: 
    ...: for x in folderList: 
    ...:  folderString.append(x.folders) 
    ...: df_batch=spark.read.csv(folderString) 
    ...: 

In [47]: df_batch.show() 
+------------+---+-------------------+----------+----------+ 
|   _c0|_c1|    _c2|  _c3|  _c4| 
+------------+---+-------------------+----------+----------+ 
|6C25B9C3DD54| 1|2017-01-07 00:00:01|1483718401|1483718400| 
|38BC1ADB0164| 3|2017-01-06 00:00:01|1483632001|1483632000| 
|38BC1ADB0164| 3|2017-01-07 00:00:01|1483718401|1483718400| 

ответ

0

Я получил это решается:

df=spark.read.parquet(folderString[0]) 
y=0 
for x in folderString: 
    if y>0: 
     df=df.union(spark.read.parquet(x)) 
    y=y+1 

это очень некрасиво решение, если у вас есть хорошая идея, пожалуйста, дайте мне знать. большое спасибо.

несколько дней спустя, нашел идеальный способ решить эту проблему:

df=spark.read.parquet(*folderString) 
+0

Но если вы укажете на необработанную папку. Как 'spark.read.паркет ('/ data/raw') 'будет работать правильно. –

+0

спасибо за комментарий, я не хочу читать все папки под/data/raw, только некоторые из них хранятся в переменной списка folderString –

1

Вы столкнулись с промахом пониманием раздела в Hadoop и паркете.

См., У меня есть простая файловая структура, разбитая на год. Это походит на это:

my_folder 
. 
├── year-month=2016-12 
| └── my_files.parquet 
├── year-month=2016-11 
| └── my_files.parquet 

Если я сделать чтение из my_folder без фильтра в моем читателя dataframe как это:

df = saprk.read.parquet("path/to/my_folder") 
df.show() 

Если проверить искровой DAG визуализации вы можете видеть, что в этом случае он будет читать все мои разделы, как вы сказали:

enter image description here

в приведенном выше случае, каждой точке в первом квадрате о ne раздел моих данных.

Но если я изменить свой код к этому:

df = saprk.read.parquet("path/to/my_folder")\ 
      .filter((col('year-month') >= lit(my_date.strftime('%Y-%m'))) & 
        (col('year-month') <= lit(my_date.strftime('%Y-%m')))) 

визуализация DAG покажет, сколько разделов я использую:

enter image description here

Итак, если вы отфильтровать по столбцу это раздел, который вы не будете читать все файлы. Просто, что вам нужно, вам не нужно использовать это решение для чтения одной папки по папке.

+0

Привет, Thiago, спасибо за ответ, моя проблема в том, что папки Мне нужно читать случайно. он полностью основан на сегодняшнем инкрементном файле, и файл может содержать данные любого дня, я мог бы определить их по сегодняшнему файлу путем группировки по дням. для этого используется df_inc. как только я узнаю, что в тот день (безусловно, включенный в него сегодня) нужно переделать, я буду читать как файл истории, так и сегодняшнее изменение. поэтому я хотел бы прочитать все файлы, содержащие df_inc, и не мог предоставить точный фильтр. тот же самый метод отлично работает, если я прочитал метод csv. –

 Смежные вопросы

  • Нет связанных вопросов^_^