3

Я пытаюсь запустить приложение spark в локальном режиме. Чтобы настроить все это, я последовал этому руководству: http://blog.d2-si.fr/2015/11/05/apache-kafka-3/, (в French), показывающий каждый шаг, чтобы создать локальную среду kafka/zookeeper.Локальное приложение Kafka с ошибкой: NoSuchMethodError: createEphemeral

Кроме того, я использую IntelliJ со следующей конфигурацией:

val sparkConf = new SparkConf().setAppName("zumbaApp").setMaster("local[2]") 

И мой выполнения конфигурации, для потребителя:

"127.0.0.1:2181" "zumbaApp-gpId" "D2SI" "1" 

И для производителя:

"127.0.0.1:9092" "D2SI" "my\Input\File.csv" 300 

Заблаговременно , Я проверил, получил ли потребитель данные от производителя по умолчанию console-producer и console-consumer от kafka_2.10-0.9.0.1; оно делает.

Но я столкнулся следующее сообщение об ошибке:

java.lang.NoSuchMethodError: org.I0Itec.zkclient.ZkClient.createEphemeral(Ljava/lang/String;Ljava/lang/Object;Ljava/util/List;)V 
at kafka.utils.ZkPath$.createEphemeral(ZkUtils.scala:921) 
at kafka.utils.ZkUtils.createEphemeralPath(ZkUtils.scala:348) 
at kafka.utils.ZkUtils.createEphemeralPathExpectConflict(ZkUtils.scala:363) 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$18.apply(ZookeeperConsumerConnector.scala:839) 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$18.apply(ZookeeperConsumerConnector.scala:833) 
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) 
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) 
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) 
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) 
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) 
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.reflectPartitionOwnershipDecision(ZookeeperConsumerConnector.scala:833) 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:721) 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1$$anonfun$apply$mcV$sp$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:636) 
at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcV$sp(ZookeeperConsumerConnector.scala:627) 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:627) 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply(ZookeeperConsumerConnector.scala:627) 
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) 
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:626) 
at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:967) 
at kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:254) 
at kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:156) 
at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:111) 
at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148) 
at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130) 
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:575) 
at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:565) 
at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1992) 
at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1992) 
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
at org.apache.spark.scheduler.Task.run(Task.scala:89) 
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
at java.lang.Thread.run(Thread.java:745) 

мне не удалось в решении этого. Я думал, что это была ошибка zookeeper -config, но после сравнения с рабочей версией приложения на другой машине с теми же конфигурационными файлами, похоже, это уже не так.

ответ

4

Похоже, у вас есть проблема зависимости.

Проверьте версию библиотеки com.101tec.zkclient в нашем пути к классам. Кафке нужна версия 0.7

Кроме того, поскольку kafka_2.10-0.9.0.1 API для производителей и потребителей больше не используют zookeeper. Кажется, что Spark-streaming использует в вашем случае версию Kafka 0.8.

+0

Ответ был прост, чем я думал, но трудно понять. Это решило мою проблему. Большое спасибо. – wipman