0

У меня есть кластер Spark 2.0.2, с которым я сталкиваюсь через Pyspark через Jupyter Notebook. У меня есть несколько файлов txt с разделителями каналов (загружаемых в HDFS, но также доступных в локальном каталоге), которые мне нужно загрузить с использованием spark-csv в три отдельных фрейма данных, в зависимости от имени файла.Pyspark читает несколько файлов csv в dataframe (OR RDD?)

Я вижу три подхода я могу взять - или я могу использовать питон как-то перебирать каталог HDFS (не понял, как это сделать еще, загрузите каждый файл, а затем сделать союз

I. также известно, что существует некоторую подстановочные functionalty (см here) в искре - я, вероятно, могу использовать

Наконец, я мог бы использовать панда, чтобы загрузить файл ваниль CSV с диска как панды dataframe, а затем создать искры dataframe вниз. вот что эти файлы большие, и загрузка в память на одном узле может достигать ~ 8 гб (вот почему это перемещается в кластер в первую очередь).

Вот код, который я до сих пор, и некоторые псевдо-код для двух методов:

import findspark 
findspark.init() 
import pyspark 
from pyspark.sql import SparkSession 
import pandas as pd 

sc = pyspark.SparkContext(appName = 'claims_analysis', master='spark://someIP:7077') 

spark = SparkSession(sc) 

#METHOD 1 - iterate over HDFS directory 
for currFile in os.listdir(HDFS:///someDir//): 
    if #filename contains 'claim': 
     #create or unionAll to merge claim_df 
    if #filename contains 'pharm': 
     #create or unionAll to merge pharm_df 
    if #filename contains 'service': 
     #create or unionAll to merge service_df 

#Method 2 - some kind of wildcard functionality 
claim_df = spark.read.format('com.databricks.spark.csv').options(delimiter = '|',header ='true',nullValue ='null').load('HDFS:///someDir//*<claim>.csv') 
pharm_df = spark.read.format('com.databricks.spark.csv').options(delimiter = '|',header ='true',nullValue ='null').load('HDFS:///someDir//*<pharm>.csv') 
service_df = spark.read.format('com.databricks.spark.csv').options(delimiter = '|',header ='true',nullValue ='null').load('HDFS:///someDir//*<service>.csv') 


#METHOD 3 - load to a pandas df and then convert to spark df 
for currFile in os.listdir(HDFS:///someDir//) 
    pd_df = pd.read_csv(currFile, sep = '|') 
    df = spark.createDataFrame(pd_df) 
    if #filename contains 'claim': 
     #create or unionAll to merge claim_df 
    if #filename contains 'pharm': 
     #create or unionAll to merge pharm_df 
    if #filename contains 'service': 
     #create or unionAll to merge service_df 

Кто-нибудь знает, как реализовать метод 1 или 2? Я не смог понять это. Кроме того, я был удивлен, что нет лучшего способа загрузить файлы csv в фреймворк pyspark - использование стороннего пакета для чего-то похожего на то, что это должна быть родная функция, смутила меня (я просто пропустил стандартный вариант использования для загрузки файлов csv в dataframe?) В конечном счете, я собираюсь написать объединенный единый фрейм данных обратно в HDFS (используя .write.parquet()), чтобы затем я мог очистить память и сделать некоторые аналитики с помощью MLlib. Если подход, который я выделил, не является лучшей практикой, я был бы признателен за толкание в правильном направлении!

+0

Я думаю, что вы на правильном пути с № 2. Вы столкнулись с ошибкой или чем-то еще? Есть ли что-то в том, что вы пытались, что не сработало? – santon

+0

Я продолжал получать файл, который не найден, поэтому я думаю, что проблема была в моей подстановочной реализации. Во-вторых, будут ли все файлы, соответствующие шаблону, автоматически объединяться? Я немного запутался в функциональности шаблона искры. – flyingmeatball

+0

Да, Spark объединяет все записи во всех файлах, соответствующих шаблону. Если вы получаете не найденный файл, попробуйте только с жестким кодированием URI в один файл. – santon

ответ

4

Подход 1:

В питона вы не можете напрямую обратиться к HDFS месте. Вам нужно обратиться за помощью к другой библиотеке, такой как pydoop. В scala и java у вас есть API. Даже с pydoop вы будете читать файлы по одному. Плохо читать файлы по одному, а не использовать параметр параллельного чтения, предоставляемый искру.

подход 2:

Вы должны быть в состоянии указать на несколько файлах с запятой или с джокером. Таким образом, искра заботится о чтении файлов и распространяет их на разделы. Но если вы идете с опцией union с каждым фреймом данных, есть один крайний случай, когда вы динамически читаете каждый файл. Когда у вас много файлов, список может стать настолько огромным на уровне драйвера и может вызвать проблемы с памятью. Основная причина в том, что процесс чтения все еще происходит на уровне водителя.

Этот вариант лучше. Искра будет читать все файлы, связанные с регулярным выражением, и преобразовывать их в разделы. Вы получаете один RDD для всех подстановочные матчей, а оттуда вам не нужно беспокоиться о союзе для

Пример кода cnippet отдельных РДД в:

distFile = sc.textFile("/hdfs/path/to/folder/fixed_file_name_*.csv") 

Подход 3:

Если у вас есть некоторые устаревшие приложения в python, который использует черты pandas, я бы предпочел использовать искровое обеспечение API

+0

Спасибо за ответ. Значит, вы рекомендуете вариант 2. Меня меньше беспокоит количество файлов, чем размер файлов. Будет ли подстановочный файл изначально добавлять файлы вместе? Например, если есть 3 файла, которые соответствуют шаблону, автоматически ли он объединяет их для меня или возвращает список из трех отдельных файлов? – flyingmeatball

+0

обновил мой ответ для подхода 2 – Ramzy