0

У меня возникли серьезные проблемы с попыткой запустить потоковое приложение с искровым кинезитом в кластер EMR Amazon (1 мастер, 3 сотрудника).Ошибка при запуске приложения для ускоренного кинезирования в режиме кластера (Amazon EMR)

То, что я пытаюсь достичь, - это создать банку FAT с помощью плагина сборки sbt для запуска команды spark-submit на главном узле. Команда я использую следующий:

искровых представить --class МойКласс --master пряжа --deploy-Mode кластера 1g --executor-память --executor-ядра 2 HDFS: // url: port/my.jar

Это приложение принимает входящие данные из потока кинезий и на его основе выполняет запрос (пост-обратно) к URL-адресу, на который я могу отслеживать. Я тестировал свое приложение локально, запустив его настройку «Мастер на SparkConfig» на локальные [ядра].

Кроме того, здесь мой код и моя сборка sbt. Мой проект использует Scala 2.11.8 и SBT 0.13.8

build.sbt

name := "my-app" 

version := "1.0" 

scalaVersion := "2.11.8" 

libraryDependencies ++= Seq(
    "org.apache.spark" % "spark-streaming_2.11" % "2.0.0" % "provided", 
    "org.apache.spark" % "spark-streaming-kinesis-asl-assembly_2.11" % "2.0.0", 
    "org.scalaj" % "scalaj-http_2.11" % "2.3.0" 
) 

assemblyMergeStrategy in assembly := { 
    case PathList("org", "apache", xs @ _*) => MergeStrategy.last 
    case x => 
    val oldStrategy = (assemblyMergeStrategy in assembly).value 
    oldStrategy(x) 
} 

MyClass

import com.amazonaws.auth.BasicAWSCredentials 
import com.amazonaws.internal.StaticCredentialsProvider 
import com.amazonaws.regions.RegionUtils 
import com.amazonaws.services.kinesis.AmazonKinesisClient 
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream 
import com.keynetic_digital.model.RawKDLog 
import org.apache.spark.SparkConf 
import org.apache.spark.storage.StorageLevel 
import org.apache.spark.streaming.kinesis.KinesisUtils 
import org.apache.spark.streaming.{Duration, Milliseconds, StreamingContext} 
import utils.HttpRequest 

/** 
    * Created by franco on 18/01/17. 
    */ 
object MyClass { 

    val awsAccessKeyId : String = "xxxxxxxxx" 
    val awsSecretKey : String = "xxxxxxxx" 
    val kinesisStreamName : String = "xxxxxxxx" 
    val kinesisEndpoint : String = "https://kinesis.us-west-2.amazonaws.com" 
    val appName : String = "xxxxxxxxx" 


    def main(args: Array[String]): Unit = { 
    //Set up Amazon Kinesis Client 
    val kinesisClient : AmazonKinesisClient = createKinesisClient 

    //Get all Kinesis shards 
    val shards = kinesisClient.describeStream(kinesisStreamName).getStreamDescription.getShards.size() 

    val batchInterval = Milliseconds(5000) 

    // Create Spark Streaming Context 
    val ssc : StreamingContext = createContext(batchInterval) 
    val regionName = RegionUtils.getRegionByEndpoint(kinesisEndpoint).getName 

    // Create the Kinesis DStreams 
    val kinesisStreams = (0 until shards).map { i => 
     KinesisUtils.createStream(ssc, appName, 
     kinesisStreamName,kinesisEndpoint, 
     regionName,InitialPositionInStream.LATEST, batchInterval * 2, 
     StorageLevel.MEMORY_AND_DISK_2) 
    } 

    //Unified all Streams 
    val stream = ssc.union(kinesisStreams) 

    //Get an RDD of Option(KDLog) items 
    val jsons = stream.map(bytes => Option(RawKDLog.fromJson(bytes))).filter(_.isDefined) 

    jsons.foreachRDD{rdd => 
     rdd.foreach{log => 
     handleLog(log) 
     } 
    } 

    // Start the streaming context and await termination 
    ssc.start() 
    ssc.awaitTermination() 
    } 

    def createKinesisClient : AmazonKinesisClient = { 
    //Set System Properties for Worker 
    System.setProperty("aws.accessKeyId", awsAccessKeyId) 
    System.setProperty("aws.secretKey", awsSecretKey) 

    //Setting AWS Credentials 
    val credentials : BasicAWSCredentials = new BasicAWSCredentials(awsAccessKeyId,awsSecretKey) 

    //Setting AWS Credential Provider 
    val provider : StaticCredentialsProvider = new StaticCredentialsProvider(credentials) 

    //Setting Kinesis Client 
    val kinesisClient : AmazonKinesisClient = new AmazonKinesisClient(provider) 
    kinesisClient.setEndpoint(kinesisEndpoint) 

    kinesisClient 
    } 

    def createContext(batchInterval : Duration) : StreamingContext = { 

    // Create Spark Configuration 
    val config = new SparkConf().setAppName(appName) 

    // Create Spark Streaming Context 
    new StreamingContext(config, batchInterval) 
    } 

    def handleLog(log : Option[RawKDLog]) : Unit = { 
    if(log.isDefined){ 
     postBack(log.get) 
    } 
    } 

    /** 
    * Method that handles url postback requests 
    */ 
    private def postBack(log : RawKDLog) = { 
    //TODO url queryString replacement & request masking 
    val postBackUrl : String = "url where I can track requests by tailing Nginx log" 

    HttpRequest(postBackUrl) .asString 

    } 

} 

После завершения заполнения приложения в главном узле кластера, то следующая ошибка.

17/01/18 14:39:26 INFO Client: 
    client token: N/A 
    diagnostics: User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 74, ip-172-31-42-151.us-west-2.compute.internal): java.lang.NoSuchMethodError: org.apache.spark.storage.BlockManager.get(Lorg/apache/spark/storage/BlockId;)Lscala/Option; 
    at org.apache.spark.streaming.kinesis.KinesisBackedBlockRDD.getBlockFromBlockManager$1(KinesisBackedBlockRDD.scala:104) 
    at org.apache.spark.streaming.kinesis.KinesisBackedBlockRDD.compute(KinesisBackedBlockRDD.scala:117) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) 
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319) 
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70) 
    at org.apache.spark.scheduler.Task.run(Task.scala:86) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

Driver stacktrace: 
    ApplicationMaster host: 175.31.43.46 
    ApplicationMaster RPC port: 0 
    queue: default 
    start time: 1484750075922 
    final status: FAILED 
    tracking URL: http://ip-175-31-46-219.us-west-2.compute.internal:20888/proxy/application_1484568737877_0012/ 
    user: hadoop 
Exception in thread "main" org.apache.spark.SparkException: Application application_1484568737877_0012 finished with failed status 
    at org.apache.spark.deploy.yarn.Client.run(Client.scala:1132) 
    at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1178) 
    at org.apache.spark.deploy.yarn.Client.main(Client.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736) 
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185) 
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
17/01/18 14:39:26 INFO ShutdownHookManager: Shutdown hook called 

Я бы очень признателен за любые рекомендации по этому вопросу, так как я совершенно не знаком с разработкой и работой с искровыми приложениями.

ответ

0

Решение этой проблемы было сказано по этому вопросу Spark Streaming - Error when reading from Kinesis

На самом деле то, что произошло, что я создал свой кластер с амазонки ЭМИ 5.2.1, который использует Спарк 2.0.2, и я в комплект моего FAT банки использовать spark-streaming-kinesis-asl-assembly_2.11 2.0.0.

Поэтому мое приложение не может прочитать входные данные от Kinesis

я перешел на ЭМИ 5.0.0, который является последней версии с использованием искровых 2.0.0 и все выполняется, как ожидалось