Я установил искровой кластер с мастером и двумя подчиненными (я использую Spark Standalone). Кластер работает хорошо с некоторыми примерами, но не с моим приложением. Мой рабочий процесс приложений заключается в том, что он будет читать csv -> извлекать каждую строку в csv вместе с заголовком -> конвертировать в JSON -> сохранять на S3. Вот мой код:PySpark - Spark clusters EC2 - не удалось сэкономить до S3
def upload_func(row):
f = row.toJSON()
f.saveAsTextFile("s3n://spark_data/"+ row.name +".json")
print(f)
print(row.name)
if __name__ == "__main__":
spark = SparkSession \
.builder \
.appName("Python Spark SQL data source example") \
.getOrCreate()
df = spark.read.csv("sample.csv", header=True, mode="DROPMALFORMED")
df.rdd.map(upload_func)
Я также экспортировать AWS_Key_ID
и AWS_Secret_Key
в окружающую среду ec2. Однако с приведенным выше кодом мое приложение не работает. Ниже приведены вопросы:
Файлы JSON не сохраняются в S3, я попытался запустить приложение несколько раз, а также перезагрузить страницу S3, но нет данных. Приложение завершилось без ошибок в журнале. Кроме того,
print(f)
иprint(row.name)
не распечатываются в журнале. Что мне нужно исправить, чтобы получить JSON-сохранение на S3 и есть ли в любом случае для меня печать в журнале для отладки?В настоящее время мне нужно поместить файл csv в рабочий узел, чтобы приложение могло читать файл csv. Как я могу поместить файл в другое место, скажем, мастер-узел и когда приложение запустится, он разделит файл csv на все рабочие узлы, чтобы они могли выполнять параллельную загрузку в качестве распределенной системы?
Справка действительно оценена. Заранее благодарны за Вашу помощь.
ОБНОВЛЕНО
После ввода Logger для отладки, я определил вопрос о том, что функция карты upload_func()
не вызываются, или приложение не может попасть внутрь этой функции (Logger распечатанных сообщений до и после вызова функции) , Пожалуйста, помогите, если вы знаете причину, почему?