2017-01-20 11 views
2

Я использую spark-redshift для загрузки потока Kafka, получающего данные из бинарного файла MySQL.spark-redshift - Ошибка при использовании Spark 2.1.0

Когда я пытаюсь сохранить RDD в Redshift исключение является забросил:

command> ./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.0,databricks:spark-redshift:3.0.0-preview1,com.amazonaws:aws-java-sdk:1.11.80,org.apache.hadoop:hadoop-aws:2.7.2 processor.py 

Код процессора:

from pyspark import SparkContext,SparkConf 
from pyspark.streaming import StreamingContext 
from pyspark.sql import Row, SparkSession, SQLContext 
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition 

import json 

def process(rdd,sc): 
    try: 
     dataset = rdd.map(lambda (key, value): json.loads(value)['data']).collect() 
     spark = SparkSession.builder.config(conf=sc.getConf()).getOrCreate(); 

     df = spark.createDataFrame(dataset) 
     df.write.format("com.databricks.spark.redshift") \ 
      .option("url","jdbc:redshift://XXXXXXX.us-east-1.redshift.amazonaws.com:5439/cerebro?user=XXXXXd&password=XXXXXX-") \ 
      .option("dbtable", "cur_date") \ 
      .option("tempdir", "s3n://BUCKET/stg/avro/cur_date/data") \ 
      .option("aws_iam_role","arn:aws:iam::XXXXXXXX:role/XXXXXXXXXXX") \ 
      .option("extracopyoptions", "TIMEFORMAT AS 'MM.DD.YYYY HH:MI:SS'") \ 
      .mode("error") \ 
      .save() 
     #df.write.format("com.databricks.spark.avro").save("/tmp/output") 
    except Exception,e: 
     print(e) 
     pass 

conf = SparkConf().setMaster("local[*]").setAppName("BinlogStreamProcessor") 

sc = SparkContext(conf=conf) 
sc._jsc.hadoopConfiguration().set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem") 
sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", "XXXXXXXXXXXXXXXXXX") 
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX") 

ssc = StreamingContext(sc, 10) 

zkQuorum = "localhost:32774,localhost:32775,localhost:32776" 
topic = "maxwell" 

stream = KafkaUtils.createStream(ssc, zkQuorum, "binlog-consumer", {topic: 1}) 

df = stream.foreachRDD(lambda k: process(k,sc)) 

ssc.start() 
ssc.awaitTermination() 

ERROR MESSAGE

17/01/20 13:17:34 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 3) 
java.lang.AbstractMethodError: org.apache.spark.sql.execution.datasources.OutputWriterFactory.getFileExtension(Lorg/apache/hadoop/mapreduce/TaskAttemptContext;)Ljava/lang/String; 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.<init>(FileFormatWriter.scala:232) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:182) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:99) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
17/01/20 13:17:34 ERROR TaskSetManager: Task 0 in stage 2.0 failed 1 times; aborting job 
17/01/20 13:17:34 ERROR FileFormatWriter: Aborting job null. 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 3, localhost, executor driver): java.lang.AbstractMethodError: org.apache.spark.sql.execution.datasources.OutputWriterFactory.getFileExtension(Lorg/apache/hadoop/mapreduce/TaskAttemptContext;)Ljava/lang/String; 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.<init>(FileFormatWriter.scala:232) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:182) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:99) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1435) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1423) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1422) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1422) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802) 
    at scala.Option.foreach(Option.scala:257) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:802) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1650) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:628) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1918) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1931) 
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1951) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:127) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:121) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:121) 
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:121) 
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:101) 
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58) 
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56) 
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132) 
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113) 
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87) 
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87) 
    at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:492) 
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215) 
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:198) 
    at com.databricks.spark.redshift.RedshiftWriter.unloadData(RedshiftWriter.scala:295) 
    at com.databricks.spark.redshift.RedshiftWriter.saveToRedshift(RedshiftWriter.scala:392) 
    at com.databricks.spark.redshift.DefaultSource.createRelation(DefaultSource.scala:108) 
    at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:426) 
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 
    at py4j.Gateway.invoke(Gateway.java:280) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:214) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.AbstractMethodError: org.apache.spark.sql.execution.datasources.OutputWriterFactory.getFileExtension(Lorg/apache/hadoop/mapreduce/TaskAttemptContext;)Ljava/lang/String; 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.<init>(FileFormatWriter.scala:232) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:182) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129) 
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) 
    at org.apache.spark.scheduler.Task.run(Task.scala:99) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    ... 1 more 

Я также попробовали использовать другие версии, такие как 2.0.2 и 2.0.1, но без успеха s.

Есть ли способ решить эту проблему в этой версии или сделать этот код работы в другой версии Spark или spark-redshift? Я начинаю кодировать в искры, и любой совет будет полезен.

ответ

1

У меня была такая же проблема с использованием Spark 2.1.0, но я смог решить проблему с Spark 2.0.0. OutputWriterFactory.getFileExtension() добавлен для 2.1.0. (См. SPARK-18024), поэтому я бы предположил, что эта часть должна работать над версиями 2.0.1 и 2.0.2. (но я не пробовал их и мог ошибаться). Другое примечание: искра-красное смещение, похоже, сертифицировано только для 2.0.0. Вот build script

sparkVersion := "2.0.0", 
testSparkVersion := sys.props.get("spark.testVersion").getOrElse(sparkVersion.value), 

... 

    libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" % testSparkVersion.value % "test" exclude("org.apache.hadoop", "hadoop-client") force(), 
    "org.apache.spark" %% "spark-sql" % testSparkVersion.value % "test" exclude("org.apache.hadoop", "hadoop-client") force(), 
    "org.apache.spark" %% "spark-hive" % testSparkVersion.value % "test" exclude("org.apache.hadoop", "hadoop-client") force(), 
    "com.databricks" %% "spark-avro" % testSparkAvroVersion.value % "test" exclude("org.apache.avro", "avro-mapred") force() 
),