2016-12-15 5 views
0

я выполнить следующий код PySpark:Почему Spark работает в Google Dataproc хранит временные файлы на внешнем хранилище (GCS) вместо локального диска или HDFS при использовании saveAsTextFile?

from pyspark import SparkContext 

sc = SparkContext() 

data = sc.textFile('gs://bucket-name/input_blob_path') 
sorted_data = data.sortBy(lambda x: sort_criteria(x)) 
sorted_data.saveAsTextFile(
    'gs://bucket-name/output_blob_path', 
    compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec" 
) 

Работа выполнена успешно. Однако во время выполнения задания Spark создал много временных капель по следующему пути gs://bucket-name/output_blob_path/_temporary/0/. Я понял, что удаление всех этих временных блоков в конце заняло половину времени выполнения задания, а загрузка процессора на 1% за это время (огромная трата ресурсов).

Есть ли способ хранить временные файлы на локальном диске (или HDFS) вместо GCP? Я все равно хотел бы сохранить окончательные результаты (отсортированный набор данных) для GCP.

Мы использовали Dataproc Spark cluster (VM type 16cores, 60GM) с 10 рабочими узлами. Объем входных данных составлял 10 ТБ.

ответ

1

_temporary Файлы, которые вы видите, являются артефактом FileOutputCommitter, используемым под капотом. Важно отметить, что эти временные капли не были строго «временными» данными, но на самом деле были завершены выходные данные, которые только «переименовываются» в конечный пункт назначения при завершении задания. «Фиксировать» эти файлы с помощью переименования на самом деле быстро, потому что и источник, и пункт назначения находятся на GCS; по этой причине нет возможности заменить эту часть рабочего процесса размещением временных файлов на HDFS и затем «совершить» в GCS, потому что тогда фиксация потребует повторной установки всего выходного набора данных из HDFS в GCS. И, в частности, базовые классы Hadoop FileOutputFormat не поддерживают такую ​​идиому.

GCS сам по себе не является реальной файловой системой, но является «хранилищем объектов», а соединитель GCS внутри Dataproc только имитирует HDFS в меру своих возможностей. Одним из следствий является то, что удаление заполнения каталога файлами на самом деле требует, чтобы GCS удалял отдельные объекты под капотом, а не реальную файловую систему, просто отменяя индексный дескриптор.

На практике, если вы делаете это, это, вероятно, означает, что ваш выход так или иначе разбит на слишком много файлов, так как очистка происходит в партиях ~ 1000 файлов за раз. Таким образом, до десятков тысяч выходных файлов обычно не должно быть заметно медленным. Слишком много файлов также замедлит работу над этими файлами в будущем. Самое простое решение, как правило, это просто уменьшить количество выходных файлов, когда это возможно, например, с помощью repartition():

from pyspark import SparkContext 

sc = SparkContext() 

data = sc.textFile('gs://bucket-name/input_blob_path') 
sorted_data = data.sortBy(lambda x: sort_criteria(x)) 
sorted_data.repartition(1000).saveAsTextFile(
    'gs://bucket-name/output_blob_path', 
    compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec" 
) 
+0

Спасибо за объяснение. Я немного удивлен, что будет слишком много файлов, так как мы сортируем данные, которые были экспортированы из BigQuery в GCS. Мое предположение заключалось в том, что функция экспорта BiqQuery уже оптимизирует количество разделов (оптимальное количество файлов для хранения набора данных в GCS). – user2548047

+0

В зависимости от применяемых видов RDD количество разделов после преобразования может не совпадать с количеством входных разделов, а также в этом случае FileInputFormat будет по умолчанию разбивать входные файлы на более мелкие разделы независимо от количество входных файлов. Вы можете настроить это с помощью '--properties spark.hadoop.fs.gs.block.size = 536870912', чтобы увеличить до 512 Мбайт, например, вместо значения по умолчанию 64 МБ. –

+0

Вы также можете настроить это значение по умолчанию во время развертывания кластера. 'gcloud dataproc кластеры создают кластер my-cluster --properties: fs.gs.block.size = 536870912' будет разумным, если ваши задания обычно находятся в диапазоне 10 ТБ. Это было бы слишком высоко, если бы ваши рабочие места были, скажем, 10 ГБ. В большинстве случаев лучше ориентироваться на более чем 1000 и меньше, чем, возможно, 50000 разделов, но также, как правило, не хотят идти на меньший размер блока, чем 64 МБ даже для небольших заданий. –