1

Я установил искровой кластер с мастером и двумя подчиненными (я использую Spark Standalone). Кластер работает хорошо с некоторыми примерами, но не с моим приложением. Мой рабочий процесс приложений заключается в том, что он будет читать csv -> извлекать каждую строку в csv вместе с заголовком -> конвертировать в JSON -> сохранять на S3. Вот мой код:PySpark - Spark clusters EC2 - не удалось сэкономить до S3

def upload_func(row): 
    f = row.toJSON() 
    f.saveAsTextFile("s3n://spark_data/"+ row.name +".json") 
    print(f) 
    print(row.name) 

if __name__ == "__main__": 
    spark = SparkSession \ 
     .builder \ 
     .appName("Python Spark SQL data source example") \ 
     .getOrCreate() 
    df = spark.read.csv("sample.csv", header=True, mode="DROPMALFORMED") 
    df.rdd.map(upload_func) 

Я также экспортировать AWS_Key_ID и AWS_Secret_Key в окружающую среду ec2. Однако с приведенным выше кодом мое приложение не работает. Ниже приведены вопросы:

  1. Файлы JSON не сохраняются в S3, я попытался запустить приложение несколько раз, а также перезагрузить страницу S3, но нет данных. Приложение завершилось без ошибок в журнале. Кроме того, print(f) и print(row.name) не распечатываются в журнале. Что мне нужно исправить, чтобы получить JSON-сохранение на S3 и есть ли в любом случае для меня печать в журнале для отладки?

  2. В настоящее время мне нужно поместить файл csv в рабочий узел, чтобы приложение могло читать файл csv. Как я могу поместить файл в другое место, скажем, мастер-узел и когда приложение запустится, он разделит файл csv на все рабочие узлы, чтобы они могли выполнять параллельную загрузку в качестве распределенной системы?

Справка действительно оценена. Заранее благодарны за Вашу помощь.

ОБНОВЛЕНО

После ввода Logger для отладки, я определил вопрос о том, что функция карты upload_func() не вызываются, или приложение не может попасть внутрь этой функции (Logger распечатанных сообщений до и после вызова функции) , Пожалуйста, помогите, если вы знаете причину, почему?

ответ

0

необходимо принудительно оценить карту; искра будет выполнять только работу по требованию.

df.rdd.map(upload_func).count() должно это сделать