2016-11-16 10 views
0

Я объединение двух dataframes на одну общей колонке, а затем побежал метод показа:раб потерял и очень медленно присоединиться к искре

df= df1.join(df2, df1.col1== df2.col2, 'inner') 
    df.show() 

Тогда присоединяйтесь выбежал очень медленно и, наконец, поднять ошибку: раб потерял.

Py4JJavaError: An error occurred while calling o109.showString. 

    : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 8.0 failed 4 times, most recent failure: Lost task 0.3 in stage 8.0 : ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Slave lost 

Driver StackTrace: на

org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212) at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165) at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174) at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499) at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1499) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2086) at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1498) at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1505) at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1375) at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1374) at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2099) at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1374) at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1456) at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:170) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:209) at java.lang.Thread.run(Thread.java:745)

После некоторого поиска, кажется, что это проблема, связанная память. Затем я увеличил перераспределение до 3000, увеличил память исполнителей, увеличил memoryOverhead, но до сих пор не повезло, у меня такая же потерянная ведомая ошибка. Во время df.show() я обнаружил, что один из файлов записи в случайном порядке очень высок, другие были не такими высокими. Любые подсказки? Благодаря

+0

Похоже, что данные могут быть перекошены - сколько строк находится в двух кадрах данных соответственно? Также, какие типы экземпляров вы используете и сколько памяти вы выделяете? И можете ли вы попытаться сделать 'count' вместо' show' после 'join'? –

+0

@GlennieHellesSindholt Да, счет прошел. Один блок данных в 100 раз больше, чем другой. Меньший df составляет около 6M. Я использую Spark 1.6 на EC2. – newleaf

+0

Я подозревал. Я предполагаю, что если вы делаете 'df = df1.join (df2, df1.col1 == df2.col2, 'inner'). Persist (StorageLevel.MEMORY_AND_DISK)', за которым следует 'df.count', за которым следует 'df.show', это, вероятно, проходит, верно? –

ответ

1

При использовании Scala попробовать

val df = df1.join(df2,Seq("column name")) 

если pyspark

df = df1.join(df2,["columnname"]) 

или

df = df1.join(df2,df1.columnname == df2.columnname) 
display(df) 

Если пытается сделать то же самое в pyspark - SQL

df1.createOrReplaceTempView("left_test_table") 
df2..createOrReplaceTempView("right_test_table") 
left <- sql(sqlContext, "SELECT * FROM left_test_table") 
right <- sql(sqlContext, "SELECT * FROM right_test_table") 

head(drop(join(left, right), left$name))