2016-03-06 4 views
1

Я безуспешно пытаюсь запустить версию примера подсчета слов Kinesis Streaming в EMR 4.3 в режиме кластера. В частности, никакие сообщения не читаются из Kinesis, хотя я могу получить доступ к метаданным потока.Пример потоковой передачи Kinesis, не работающий в режиме кластера на EMR (EMR 4.3, Spark 1.6)

Этот же код мчит в режиме клиента на идентичном кластере ОГО (т.е. «местного [*]»), но когда я пытаюсь сделать это в режиме кластерного первое задание от Kinesis приемника застряло: spark ui jobs

и я не вижу ничего в потоковом странице Спарк интерфейса: spark ui streaming

Первоначально я думал, что это был вопрос ресурсов/количество потоков, но в зависимости от конфигурации и что я вижу в пряжу и Spark UI, это, похоже, не так (см. Все соответствующие конфигурации ниже).

Я ищу любые указатели на то, почему приложение не может читать из Kinesis, или для предлагаемых изменений в конфигурации или настройке, чтобы эта работа работала в режиме кластера.


Конфигурирование и настройка деталей

Соответствующий Kinesis поток имеет один осколок.

Я использую следующую конфигурацию в настройке кластера ОЙ:

[{"classification":"capacity-scheduler", 

"properties":{"yarn.scheduler.capacity.resource-calculator":"org.apache.hadoop.yarn.util.resource.DominantResourceCalculator"}}, 

{"classification":"spark","properties":{"maximizeResourceAllocation":"true"}}, 

{"classification":"spark-defaults","properties":{ 

"spark.executor.instances":"0", 
"spark.dynamicAllocation.enabled":"true"}}] 

И это, как моя установка внешнего вида, как и в свече ш среда: saprkui env

код, который я пытаюсь бежать:

val appName = "ks_"+DateTime.now().toString(formatter); 
val sparkConf = new SparkConf().setAppName(appName) 


val sc = new SparkContext(sparkConf); 
val batchIntervalInSec = 5 
val batchInterval = Seconds(batchIntervalInSec)  

val ssc = new StreamingContext(sc, batchInterval) 
ssc.checkpoint("/checkpoint") 
val kinesisClient = new AmazonKinesisClient(credentials) 
kinesisClient.setEndpoint(endpointUrl) 
val numShards = kinesisClient.describeStream(streamName).getStreamDescription().getShards().size 

val numStreams = numShards 

val kinesisCheckpointInterval = Seconds(batchIntervalInSec-1) 
val regionName = RegionUtils.getRegionByEndpoint(endpointUrl).getName() 


val kinesisStreams = (0 until numStreams).map { i => 
    KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName, 
    InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_ONLY) 
} 

val unionStreams = ssc.union(kinesisStreams) 

val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" ")) 


val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _) 

wordCounts.print() 
ssc.start() 
ssc.awaitTermination(); 

И это команда искра, я бегу:

spark-submit --deploy-mode cluster 
--class com.komoona.spark.kmn_spark_scala.KinesisStream 
--master yarn --conf spark.executor.cores=4 
--conf spark.executor.instances=2 
--conf spark.streaming.blockInterval=1000ms 
--jars /home/hadoop/lib/spark-streaming-kinesis-asl-assembly_2.10-1.6.0.jar,/home/hadoop/lib/amazon-kinesis-client-1.6.1.jar, 
test_app_full.jar 

EDIT: я заметил, что, несмотря на имеющих 2 исполнителей, сконфигурированных (как указано в командной строке), только один исполнитель, и водитель, показан работаю в искровом UI: executors

Может быть, источник вопроса? Любые идеи, что может вызвать это?

ответ

0

У вас есть 0 исполнителей искры в вашей конфигурации - я считаю, вы должны увеличить это. Кроме того, посмотрите в журналах на наличие ошибок.

+0

Я видел теперь, когда вы указываете число исполнителей в качестве аргумента в командной строке, так что игнорировать ту часть моего предложения. – Leandro

0

После нескольких дней головной боли, я снова перешел на мою конфигурацию кинези и изменил режим агрегации на нет, он решил все это. Кинези с режимом агрегации на сработало для меня локально, , но не в режиме emr в кластере

+0

Как вы изменили режим агрегации? – user1158559

+1

, если вы используете logstash (как мы это делали), вы можете изменить его в конфигурации logstach, басистом это может решить эту проблему: https: //issues.apache.org/jira/browse/SPARK-14421. но после нескольких месяцев работы с искровым и кинезисом в крупном масштабе (100 гб в час) моя рекомендация состояла в том, чтобы держаться подальше от кинезита, она работала в небольших масштабах, но как только вы начали загружать ее со многими производителями журналов, у нас было много проблем с задержкой время прибытия журналов и много журналов прибывают более одного раза –

+0

Спасибо за головы о Кинезисе. Я не мог найти какие-либо документы, хотя на любой опции конфигурации, чтобы изменить этот режим агрегации. У вас есть ссылка на что-то подобное? В итоге я вернулся к Spark 1.4.1 там, где он работал, но дал мне только реальные полезные данные, но не метаданные. – user1158559

0

Это сработало для меня.Вы получили шанс look-

Spark not able to fetch events from Amazon Kinesis

TL; DR

There are 2 versions of the foreachRDD available

unionStreams.foreachRDD 
unionStreams.foreachRDD ((rdd:RDD[Array[Byte]], time: Time) 

For some reason the first one is not able to get me the results but changing to the second one fetches me the results as expected. Yet to explore the reason.

1

я такая же проблема с агрегированным KINESIS потоками с Спарком + Kinesis + ЭМ (несколько версий тестировались) ... Получается что, несмотря на то, что библиотека Kinesis построена явно с protobuf-java-2.6.1 (требование из-за зависимости KCL), кластеры EMR настроены таким образом, что на практике все еще используется protobuf-java-2.5.0.

Я не смог подобраться достаточно близко, чтобы выяснить, ПОЧЕМУ это происходит, но мое быстрое и грязное решение состояло в том, чтобы удалить /usr/lib/spark/jars/protobuf-java-2.5.0.jar и заменить его собственным protobuf-java-2.6.1 в том же месте (на главном узле). Я держу версию в s3 и имею самозагрузки действия на aws s3 cp2.6.1 банки вниз в нужном месте в /usr/lib/spark/jars, затем добавьте следующую строку в ваш искровом представить (заменяющий Скале и искровая версию, где это уместно):

--packages org.apache.spark:spark-streaming-kinesis-asl_2.11:2.1.0,\ com.google.protobuf:protobuf-java-2.6.1

Кто-то более умнее меня может найти более правильное решение, чем это, но я еще не видел никаких очевидных побочных эффектов при удалении protobuf-java-2.5.0, хотя это не значит, что они не существуют.

Вы можете проверить, если это тот же самый вопрос, вы испытываете, запустив (на искру мастера) с --master local[*] вместо --master yarn и обратить внимание на следующее в журналах:

17/01/31 19:24:13 ERROR Worker: Worker.run caught exception, sleeping for 1000 milli seconds! java.lang.RuntimeException: java.util.concurrent.ExecutionException: java.lang.NoSuchMethodError: com.google.protobuf.LazyStringList.getUnmodifiableView()Lcom/google/protobuf/LazyStringList;

TL; DR -

Заменить protobuf-java-2.5.0.jar с protobuf-java-2.6.1.jar в /usr/lib/spark/jars/ на свече мастера

Добавить (замена и искровые Scala версии)

--packages org.apache.spark:spark-streaming-kinesis-asl_2.11:2.1.0,\ com.google.protobuf:protobuf-java-2.6.1 к вашему искровым представить в командной строке