2017-02-20 25 views
1

При использовании PySpark для загрузки нескольких файлов JSON с S3 я получаю сообщение об ошибке, а Spark-запрос завершается с ошибкой, если файл отсутствует.Ошибка выполнения PySpark при загрузке нескольких файлов, а одна отсутствует

вызвано следующими причинами: org.apache.hadoop.mapred.InvalidInputException: Входной шаблон S3N: //example/example/2017-02-18/*.json матчи 0 файлов

Это как Я добавляю 5 последних дней на работу с PySpark.

days = 5 
x = 0 
files = [] 

while x < days: 
    filedate = (date.today() - timedelta(x)).isoformat() 
    path = "s3n://example/example/"+filedate+"/*.json" 
    files.append(path) 
    x += 1 

rdd = sc.textFile(",".join(files))      
df = sql_context.read.json(rdd, schema) 

Как я могу заставить PySpark игнорировать недостающие файлы и продолжить работу?

ответ

1

Используйте функцию, которая пытается загрузить файл, если файл отсутствует, он терпит неудачу и возвращает False.

from py4j.protocol import Py4JJavaError 

def path_exist(sc, path): 
    try: 
     rdd = sc.textFile(path) 
     rdd.take(1) 
     return True 
    except Py4JJavaError as e: 
     return False 

Это позволяет проверить, если файлы доступны, прежде чем добавлять их в свой список без необходимости использовать команды AWS Cli или S3.

days = 5 
x = 0 
files = [] 

while x < days: 
    filedate = (date.today() - timedelta(x)).isoformat() 
    path = "s3n://example/example/"+filedate+"/*.json" 
    if path_exist(sc, path): 
     files.append(path) 
    else: 
     print('Path does not exist, skipping: ' + path) 
    x += 1 

rdd = sc.textFile(",".join(files))      
df = sql_context.read.json(rdd, schema) 

Я нашел это решение в http://www.learn4master.com/big-data/pyspark/pyspark-check-if-file-exists