У меня возникли серьезные проблемы с попыткой запустить потоковое приложение с искровым кинезитом в кластер EMR Amazon (1 мастер, 3 сотрудника).Ошибка при запуске приложения для ускоренного кинезирования в режиме кластера (Amazon EMR)
То, что я пытаюсь достичь, - это создать банку FAT с помощью плагина сборки sbt для запуска команды spark-submit на главном узле. Команда я использую следующий:
искровых представить --class МойКласс --master пряжа --deploy-Mode кластера 1g --executor-память --executor-ядра 2 HDFS: // url: port/my.jar
Это приложение принимает входящие данные из потока кинезий и на его основе выполняет запрос (пост-обратно) к URL-адресу, на который я могу отслеживать. Я тестировал свое приложение локально, запустив его настройку «Мастер на SparkConfig» на локальные [ядра].
Кроме того, здесь мой код и моя сборка sbt. Мой проект использует Scala 2.11.8 и SBT 0.13.8
build.sbt
name := "my-app"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies ++= Seq(
"org.apache.spark" % "spark-streaming_2.11" % "2.0.0" % "provided",
"org.apache.spark" % "spark-streaming-kinesis-asl-assembly_2.11" % "2.0.0",
"org.scalaj" % "scalaj-http_2.11" % "2.3.0"
)
assemblyMergeStrategy in assembly := {
case PathList("org", "apache", xs @ _*) => MergeStrategy.last
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
}
MyClass
import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.internal.StaticCredentialsProvider
import com.amazonaws.regions.RegionUtils
import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import com.keynetic_digital.model.RawKDLog
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kinesis.KinesisUtils
import org.apache.spark.streaming.{Duration, Milliseconds, StreamingContext}
import utils.HttpRequest
/**
* Created by franco on 18/01/17.
*/
object MyClass {
val awsAccessKeyId : String = "xxxxxxxxx"
val awsSecretKey : String = "xxxxxxxx"
val kinesisStreamName : String = "xxxxxxxx"
val kinesisEndpoint : String = "https://kinesis.us-west-2.amazonaws.com"
val appName : String = "xxxxxxxxx"
def main(args: Array[String]): Unit = {
//Set up Amazon Kinesis Client
val kinesisClient : AmazonKinesisClient = createKinesisClient
//Get all Kinesis shards
val shards = kinesisClient.describeStream(kinesisStreamName).getStreamDescription.getShards.size()
val batchInterval = Milliseconds(5000)
// Create Spark Streaming Context
val ssc : StreamingContext = createContext(batchInterval)
val regionName = RegionUtils.getRegionByEndpoint(kinesisEndpoint).getName
// Create the Kinesis DStreams
val kinesisStreams = (0 until shards).map { i =>
KinesisUtils.createStream(ssc, appName,
kinesisStreamName,kinesisEndpoint,
regionName,InitialPositionInStream.LATEST, batchInterval * 2,
StorageLevel.MEMORY_AND_DISK_2)
}
//Unified all Streams
val stream = ssc.union(kinesisStreams)
//Get an RDD of Option(KDLog) items
val jsons = stream.map(bytes => Option(RawKDLog.fromJson(bytes))).filter(_.isDefined)
jsons.foreachRDD{rdd =>
rdd.foreach{log =>
handleLog(log)
}
}
// Start the streaming context and await termination
ssc.start()
ssc.awaitTermination()
}
def createKinesisClient : AmazonKinesisClient = {
//Set System Properties for Worker
System.setProperty("aws.accessKeyId", awsAccessKeyId)
System.setProperty("aws.secretKey", awsSecretKey)
//Setting AWS Credentials
val credentials : BasicAWSCredentials = new BasicAWSCredentials(awsAccessKeyId,awsSecretKey)
//Setting AWS Credential Provider
val provider : StaticCredentialsProvider = new StaticCredentialsProvider(credentials)
//Setting Kinesis Client
val kinesisClient : AmazonKinesisClient = new AmazonKinesisClient(provider)
kinesisClient.setEndpoint(kinesisEndpoint)
kinesisClient
}
def createContext(batchInterval : Duration) : StreamingContext = {
// Create Spark Configuration
val config = new SparkConf().setAppName(appName)
// Create Spark Streaming Context
new StreamingContext(config, batchInterval)
}
def handleLog(log : Option[RawKDLog]) : Unit = {
if(log.isDefined){
postBack(log.get)
}
}
/**
* Method that handles url postback requests
*/
private def postBack(log : RawKDLog) = {
//TODO url queryString replacement & request masking
val postBackUrl : String = "url where I can track requests by tailing Nginx log"
HttpRequest(postBackUrl) .asString
}
}
После завершения заполнения приложения в главном узле кластера, то следующая ошибка.
17/01/18 14:39:26 INFO Client:
client token: N/A
diagnostics: User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 74, ip-172-31-42-151.us-west-2.compute.internal): java.lang.NoSuchMethodError: org.apache.spark.storage.BlockManager.get(Lorg/apache/spark/storage/BlockId;)Lscala/Option;
at org.apache.spark.streaming.kinesis.KinesisBackedBlockRDD.getBlockFromBlockManager$1(KinesisBackedBlockRDD.scala:104)
at org.apache.spark.streaming.kinesis.KinesisBackedBlockRDD.compute(KinesisBackedBlockRDD.scala:117)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:105)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Driver stacktrace:
ApplicationMaster host: 175.31.43.46
ApplicationMaster RPC port: 0
queue: default
start time: 1484750075922
final status: FAILED
tracking URL: http://ip-175-31-46-219.us-west-2.compute.internal:20888/proxy/application_1484568737877_0012/
user: hadoop
Exception in thread "main" org.apache.spark.SparkException: Application application_1484568737877_0012 finished with failed status
at org.apache.spark.deploy.yarn.Client.run(Client.scala:1132)
at org.apache.spark.deploy.yarn.Client$.main(Client.scala:1178)
at org.apache.spark.deploy.yarn.Client.main(Client.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
17/01/18 14:39:26 INFO ShutdownHookManager: Shutdown hook called
Я бы очень признателен за любые рекомендации по этому вопросу, так как я совершенно не знаком с разработкой и работой с искровыми приложениями.