2017-02-14 19 views
2

В настоящее время я работаю с кластером EMR, подключающимся к RDS, чтобы собрать 2 таблицы.Amazon EMR Pyspark: rdd.distinct.count() failling

Два созданных RDD довольно велики, но я могу выполнять операции .take (x), другие из них.

я также можем выполнять более сложные операции, такие как:

info_rdd = somerdd.map(lambda x: (x[1], x[2])).groupByKey().map(some_lambda) 
apps_rdd = apps.join(info_rdd).map(lambda x: (x[0], (x[1][0], x[1][1][0], x[1][1][1]))) 

Но делать следующую операцию для подсчета числа различных пользователей, импортированных из RDS не работает:

unique_users = rdd.distinct.count() 

Я попытался много конфигурации, чтобы увидеть, были ли это проблемы памяти до (на всякий случай, но это не решает проблему) ...

Это ошибки, которые я получаю сейчас:

Traceback (most recent call last): 
File "/home/hadoop/AppEngine/src/server.py", line 56, in <module> 
run_server() 
File "/home/hadoop/AppEngine/src/server.py", line 53, in run_server 
AppServer().run() 
File "/home/hadoop/AppEngine/src/server.py", line 45, in run 
api = create_app(self.context, self.apps, self.devices) 
File "/home/hadoop/AppEngine/src/api.py", line 190, in create_app 
engine = AppEngine(spark_context, apps, devices) 
File "/home/hadoop/AppEngine/src/engine.py", line 56, in __init__ 
self.unique_users = self.ratings.distinct().count() 
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1041, in count 
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1032, in sum 
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 906, in fold 
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 809, in collect 
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__ 
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco 

File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value 
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.5 in stage 0.0 (TID 5, ip-172-31-3-140.eu-west-1.compute.internal, executor 13): ExecutorLostFailure (executor 13 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 164253 ms 
Driver stacktrace: 
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) 
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) 
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) 
at scala.Option.foreach(Option.scala:257) 
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) 
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) 
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) 
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) 
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1944) 
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1958) 
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:935) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362) 
at org.apache.spark.rdd.RDD.collect(RDD.scala:934) 
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:453) 
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:498) 
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 
at py4j.Gateway.invoke(Gateway.java:280) 
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 
at py4j.commands.CallCommand.execute(CallCommand.java:79) 
at py4j.GatewayConnection.run(GatewayConnection.java:214) 
at java.lang.Thread.run(Thread.java:745)` 
+0

Я имею в виду сообщение в исключение 'ExecutorLostFailure (исполнитель 13 возбужденном вызвана одной из запущенных задач) Причина: Палач сердцебиение истекло после 164253 ms' – mrsrinivas

ответ

1

Решение для проблемы было следующее:

у меня не было достаточно памяти для выполнения задания. Я изменил тип основного экземпляра, который я использовал в своем кластере, к экземпляру с большим объемом доступной памяти (m4.4xlarge здесь).

Тогда я должен был точные параметры, чтобы заставить распределение памяти моих экземпляров для искрового sumbmit:

--driver-memory 2G 
--executor-memory 50G 

Вы также можете добавить эти параметры, чтобы избежать длительного задания из failling из-за биения или выделение памяти:

--conf spark.yarn.executor.memoryOverhead=XXX (large number such as 1024 or 4096) 
--conf spark.executor.heartbeatInterval=60s 
1

ExecutorLostFailure Причина: Палач сердцебиение истекло после 164253 мс

Эта ошибка означает, что исполнитель не отвечает после 165 секунд, и был убит (в предположении, что он мертв)

Если у вас есть задача, занимающая исполнитель в течение столь длительного времени, и ее необходимо выполнить, вы можете попробовать следующую настройку в командной строке spark-submit, которая увеличит время ожидания пульса до огромного количества времени как указано здесь: https://stackoverflow.com/a/37260231/5088142

Некоторые методы, как исследовать этот вопрос можно найти здесь: https://stackoverflow.com/a/37272249/5088142


Ниже будет пытаться прояснить некоторые вопросы, которые подняты в вашем вопросе.

Spark Actions vs Transformations

Спарк использует ленивые вычисления, то есть при выполнении transformation он не выполняет его. Спарк выполнить только при выполнении action

В сложных операциях, например, вы дали не существует никаких действий (т.е. ничего не было выполнено/вычисленным):

info_rdd = somerdd.map(lambda x: (x[1], x[2])).groupByKey().map(some_lambda) 
apps_rdd = apps.join(info_rdd).map(lambda x: (x[0], (x[1][0], x[1][1][0], x[1][1][1]))) 

Обзор spark doc about transformation

Вы можете видеть, что все операции используемые в примере: map, groupByKey и join являются трансформациями.

Следовательно, ничего не было сделано после выполнения этих команд.

Разница между действиями

Этих два РДА, созданными довольно огромен, но я могу выполнить .Снять (х) другие операции их.

Существует разница между take(x) действием и count

take(x) действие заканчивается после того, как он возвращается первые элементы х.

count() действие заканчивается только после того, как он пройдет весь RDD

Тот факт, что вы выполняете некоторые преобразования (как в примере), которые были , кажется, работает не имеет никакого смысла - так как они не были выполнены.

Запуск take(x) Действие не может дать никаких указаний, поскольку оно будет использовать только очень небольшую часть вашего RDD.

Заключение

Похоже, что конфигурация вашего компьютера не поддерживает размер данных, которые вы используете, или ваш код создает огромные задачи, которые вызывают исполнитель повесить в течение длительного периода времени (160 секунд).

Первый action, который был фактически выполнен на вашем РДУ был countaction