У меня есть кластер Spark 2.0.2, с которым я сталкиваюсь через Pyspark через Jupyter Notebook. У меня есть несколько файлов txt с разделителями каналов (загружаемых в HDFS, но также доступных в локальном каталоге), которые мне нужно загрузить с использованием spark-csv в три отдельных фрейма данных, в зависимости от имени файла.Pyspark читает несколько файлов csv в dataframe (OR RDD?)
Я вижу три подхода я могу взять - или я могу использовать питон как-то перебирать каталог HDFS (не понял, как это сделать еще, загрузите каждый файл, а затем сделать союз
I. также известно, что существует некоторую подстановочные functionalty (см here) в искре - я, вероятно, могу использовать
Наконец, я мог бы использовать панда, чтобы загрузить файл ваниль CSV с диска как панды dataframe, а затем создать искры dataframe вниз. вот что эти файлы большие, и загрузка в память на одном узле может достигать ~ 8 гб (вот почему это перемещается в кластер в первую очередь).
Вот код, который я до сих пор, и некоторые псевдо-код для двух методов:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
sc = pyspark.SparkContext(appName = 'claims_analysis', master='spark://someIP:7077')
spark = SparkSession(sc)
#METHOD 1 - iterate over HDFS directory
for currFile in os.listdir(HDFS:///someDir//):
if #filename contains 'claim':
#create or unionAll to merge claim_df
if #filename contains 'pharm':
#create or unionAll to merge pharm_df
if #filename contains 'service':
#create or unionAll to merge service_df
#Method 2 - some kind of wildcard functionality
claim_df = spark.read.format('com.databricks.spark.csv').options(delimiter = '|',header ='true',nullValue ='null').load('HDFS:///someDir//*<claim>.csv')
pharm_df = spark.read.format('com.databricks.spark.csv').options(delimiter = '|',header ='true',nullValue ='null').load('HDFS:///someDir//*<pharm>.csv')
service_df = spark.read.format('com.databricks.spark.csv').options(delimiter = '|',header ='true',nullValue ='null').load('HDFS:///someDir//*<service>.csv')
#METHOD 3 - load to a pandas df and then convert to spark df
for currFile in os.listdir(HDFS:///someDir//)
pd_df = pd.read_csv(currFile, sep = '|')
df = spark.createDataFrame(pd_df)
if #filename contains 'claim':
#create or unionAll to merge claim_df
if #filename contains 'pharm':
#create or unionAll to merge pharm_df
if #filename contains 'service':
#create or unionAll to merge service_df
Кто-нибудь знает, как реализовать метод 1 или 2? Я не смог понять это. Кроме того, я был удивлен, что нет лучшего способа загрузить файлы csv в фреймворк pyspark - использование стороннего пакета для чего-то похожего на то, что это должна быть родная функция, смутила меня (я просто пропустил стандартный вариант использования для загрузки файлов csv в dataframe?) В конечном счете, я собираюсь написать объединенный единый фрейм данных обратно в HDFS (используя .write.parquet()), чтобы затем я мог очистить память и сделать некоторые аналитики с помощью MLlib. Если подход, который я выделил, не является лучшей практикой, я был бы признателен за толкание в правильном направлении!
Я думаю, что вы на правильном пути с № 2. Вы столкнулись с ошибкой или чем-то еще? Есть ли что-то в том, что вы пытались, что не сработало? – santon
Я продолжал получать файл, который не найден, поэтому я думаю, что проблема была в моей подстановочной реализации. Во-вторых, будут ли все файлы, соответствующие шаблону, автоматически объединяться? Я немного запутался в функциональности шаблона искры. – flyingmeatball
Да, Spark объединяет все записи во всех файлах, соответствующих шаблону. Если вы получаете не найденный файл, попробуйте только с жестким кодированием URI в один файл. – santon