Я безуспешно пытаюсь запустить версию примера подсчета слов Kinesis Streaming в EMR 4.3 в режиме кластера. В частности, никакие сообщения не читаются из Kinesis, хотя я могу получить доступ к метаданным потока.Пример потоковой передачи Kinesis, не работающий в режиме кластера на EMR (EMR 4.3, Spark 1.6)
Этот же код мчит в режиме клиента на идентичном кластере ОГО (т.е. «местного [*]»), но когда я пытаюсь сделать это в режиме кластерного первое задание от Kinesis приемника застряло:
и я не вижу ничего в потоковом странице Спарк интерфейса:
Первоначально я думал, что это был вопрос ресурсов/количество потоков, но в зависимости от конфигурации и что я вижу в пряжу и Spark UI, это, похоже, не так (см. Все соответствующие конфигурации ниже).
Я ищу любые указатели на то, почему приложение не может читать из Kinesis, или для предлагаемых изменений в конфигурации или настройке, чтобы эта работа работала в режиме кластера.
Конфигурирование и настройка деталей
Соответствующий Kinesis поток имеет один осколок.
Я использую следующую конфигурацию в настройке кластера ОЙ:
[{"classification":"capacity-scheduler",
"properties":{"yarn.scheduler.capacity.resource-calculator":"org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"}},
{"classification":"spark","properties":{"maximizeResourceAllocation":"true"}},
{"classification":"spark-defaults","properties":{
"spark.executor.instances":"0",
"spark.dynamicAllocation.enabled":"true"}}]
И это, как моя установка внешнего вида, как и в свече ш среда:
код, который я пытаюсь бежать:
val appName = "ks_"+DateTime.now().toString(formatter);
val sparkConf = new SparkConf().setAppName(appName)
val sc = new SparkContext(sparkConf);
val batchIntervalInSec = 5
val batchInterval = Seconds(batchIntervalInSec)
val ssc = new StreamingContext(sc, batchInterval)
ssc.checkpoint("/checkpoint")
val kinesisClient = new AmazonKinesisClient(credentials)
kinesisClient.setEndpoint(endpointUrl)
val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size
val numStreams = numShards
val kinesisCheckpointInterval = Seconds(batchIntervalInSec-1)
val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName()
val kinesisStreams = (0 until numStreams).map { i =>
KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName,
InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_ONLY)
}
val unionStreams = ssc.union(kinesisStreams)
val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" "))
val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination();
И это команда искра, я бегу:
spark-submit --deploy-mode cluster
--class com.komoona.spark.kmn_spark_scala.KinesisStream
--master yarn --conf spark.executor.cores=4
--conf spark.executor.instances=2
--conf spark.streaming.blockInterval=1000ms
--jars /home/hadoop/lib/spark-streaming-kinesis-asl-assembly_2.10-1.6.0.jar,/home/hadoop/lib/amazon-kinesis-client-1.6.1.jar,
test_app_full.jar
EDIT: я заметил, что, несмотря на имеющих 2 исполнителей, сконфигурированных (как указано в командной строке), только один исполнитель, и водитель, показан работаю в искровом UI:
Может быть, источник вопроса? Любые идеи, что может вызвать это?
Я видел теперь, когда вы указываете число исполнителей в качестве аргумента в командной строке, так что игнорировать ту часть моего предложения. – Leandro