6

У меня есть данные JSON, которые я читаю в фрейме данных с несколькими полями, переделя его на основе двух столбцов и преобразовывая в Pandas.Pyspark simple re-partition и toPandas() не удается завершить только на 600 000 + строк

Эта работа не срабатывает при EMR на 600 000 строк данных с некоторыми неясными ошибками. Я также увеличил настройки памяти искрового драйвера и до сих пор не вижу никакого разрешения.

Вот мой pyspark код:

enhDataDf = (
    sqlContext 
    .read.json(sys.argv[1]) 
    ) 

enhDataDf = (
    enhDataDf 
    .repartition('column1', 'column2') 
    .toPandas() 
    ) 
enhDataDf = sqlContext.createDataFrame(enhDataDf) 
enhDataDf = (
    enhDataDf 
    .toJSON() 
    .saveAsTextFile(sys.argv[2]) 
    ) 

Мои настройки искры следующим образом:

conf = SparkConf().setAppName('myapp1') 
conf.set('spark.yarn.executor.memoryOverhead', 8192) 
conf.set('spark.executor.memory', 8192) 
conf.set('spark.driver.memory', 8192) 
sc = SparkContext(conf=conf) 

Ошибки я получаю:

16/10/01 19:57:56 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:37973 disassociated! Shutting down. 
16/10/01 19:57:11 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:42167 disassociated! Shutting down. 
16/10/01 19:57:56 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:37973 disassociated! Shutting down. 
log4j:ERROR Could not read configuration file from URL [file:/etc/spark/conf/log4j.properties]. 
log4j:ERROR Ignoring configuration file [file:/etc/spark/conf/log4j.properties]. 
16/10/01 19:57:11 ERROR ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM 
16/10/01 19:57:11 ERROR ApplicationMaster: User application exited with status 143 
log4j:ERROR Could not read configuration file from URL [file:/etc/spark/conf/log4j.properties]. 
log4j:ERROR Ignoring configuration file [file:/etc/spark/conf/log4j.properties]. 
16/10/01 19:57:56 ERROR ApplicationMaster: RECEIVED SIGNAL 15: SIGTERM 
16/10/01 19:57:56 ERROR ApplicationMaster: User application exited with status 143 
16/10/01 19:57:11 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:42167 disassociated! Shutting down. 
16/10/01 19:57:56 ERROR executor.CoarseGrainedExecutorBackend: Driver 172.31.58.76:37973 disassociated! Shutting down. 

код отлично работает на вверх до примерно 600 000 линий JSON - даже если имеется тонна памяти. Затем он продолжает терпеть неудачу.

Любые мысли о том, что происходит и как отлаживать/исправлять эту проблему?

ответ

3

Я считаю, что проблема исходит из следующей части кода:

enhDataDf = (
    enhDataDf 
    .repartition('column1', 'column2') 
    .toPandas() 
) 

.toPandas() коллектов данных, поэтому, когда число записей растет, это приведет к сбою драйвера.

Согласно вашему комментарию, это точный трубопровод, который вы используете. Это означает, что весь этап не только устарел, но и неверен.Когда данные собираются и далее распараллеливания есть гарантия, что разделение, созданное

.repartition('column1', 'column2') 

будет сохранено, когда вы заново создать искру DataFrame:

sqlContext.createDataFrame(enhDataDf) 

Если вы хотите записать данные по колонку вы можете сделать она непосредственно:

(sqlContext 
    .read.json(sys.argv[1]) 
    .repartition('column1', 'column2') 
    .write 
    .json(sys.argv[2])) 

пропуская промежуточный toPandas и преобразование в RDD.

После ваших комментариев:

Если toPandas служит цели, то он всегда будет оставаться ограничивающим фактором в трубопроводе и единственным прямым решением является масштабировать до узла драйвера. В зависимости от точных алгоритмов, которые вы применяете по собранным данным, вы можете рассмотреть альтернативные варианты:

  • Повторное использование алгоритмов, которые вы используете на вершине искры, уже не доступно.
  • Рассмотрите альтернативные рамки с лучшей совместимостью стека SciPy (например, Dask).
1

Это напомнит мне о моем Spark – Container exited with a non-zero exit code 143, где я работал в PySpark в кластере режиме, что указывает на проблему с памятью в вашем приложении.

Сначала попробуйте определить, какие машины терпят неудачу, драйвер или исполнитель (и), и, таким образом, смогут лучше нацелить ваши движения - с того, что я прочитал, это должны быть исполнители.


Я вижу, что вы уже установили memoryOverhead конфигурацию, хорошо. Теперь давайте сконцентрируемся на конфигурации memory:

... работает Python с Спарк (PySPark), поэтому весь код мой убегает кучу. По этой причине мне приходится выделять «небольшую» память (так как это сократит память, которую мне разрешено использовать из общей памяти, т. Е. Если полная память, которую мне разрешено использовать, равна 20G, и я запрашиваю 12G, тогда . 8G останется для моего приложения Python использовать

Так пытаются уменьшение что атрибут, да уменьшить его


Следующая цель:! #cores

уменьшение это тоже, например, если вы используете 8, а затем использовать 4 в исполнителях и/или водителя:

spark.executor.cores      4 
spark.driver.cores       4 
+0

Это все еще не помогает. Продолжайте получать сбои с одинаковыми сообщениями об ошибках. Я буквально бегу на M4.2x больших экземплярах с 32 ГБ памяти и настройками выше. Очень раздражает то, что он просто дает эти загадочные ошибки и не дает слепой складки. – Gopala

+0

Хм, я не вижу даже вымного от вас @Gopala, так что это значит, что мой ответ плох, я должен его удалить? – gsamaras

+3

Я не думаю, что ответ плох. У него есть некоторые идеи и полезные ссылки. Просто это не решило мою проблему, и я все еще жду, чтобы увидеть, есть ли дополнительная помощь. – Gopala