2016-09-14 4 views
0

Я создал тему Kafka с 5 разделами. И я использую API-интерфейс приемника createStream, как показано ниже. Но как-то только один приемник получает входные данные. Остальные приемники ничего не обрабатывают. Не могли бы вы помочь?Spark Kafka Receiver не собирает данные со всех разделов

JavaPairDStream<String, String> messages = null; 

      if(sparkStreamCount > 0){ 
       // We create an input DStream for each partition of the topic, unify those streams, and then repartition the unified stream. 
       List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<JavaPairDStream<String, String>>(sparkStreamCount); 
       for (int i = 0; i < sparkStreamCount; i++) { 
           kafkaStreams.add(KafkaUtils.createStream(jssc, contextVal.getString(KAFKA_ZOOKEEPER), contextVal.getString(KAFKA_GROUP_ID), kafkaTopicMap)); 
       } 
       messages = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size())); 
      } 
      else{ 
       messages = KafkaUtils.createStream(jssc, contextVal.getString(KAFKA_ZOOKEEPER), contextVal.getString(KAFKA_GROUP_ID), kafkaTopicMap); 
      } 

Spark UI With MultipleReceiver

После добавления изменений я получаю следующие исключения:

INFO : org.apache.spark.streaming.kafka.KafkaReceiver - Connected to localhost:2181 
INFO : org.apache.spark.streaming.receiver.ReceiverSupervisorImpl - Stopping receiver with message: Error starting receiver 0: java.lang.AssertionError: assertion failed 
INFO : org.apache.spark.streaming.receiver.ReceiverSupervisorImpl - Called receiver onStop 
INFO : org.apache.spark.streaming.receiver.ReceiverSupervisorImpl - Deregistering receiver 0 
ERROR: org.apache.spark.streaming.scheduler.ReceiverTracker - Deregistered receiver for stream 0: Error starting receiver 0 - java.lang.AssertionError: assertion failed 
    at scala.Predef$.assert(Predef.scala:165) 
    at kafka.consumer.TopicCount$$anonfun$makeConsumerThreadIdsPerTopic$2.apply(TopicCount.scala:36) 
    at kafka.consumer.TopicCount$$anonfun$makeConsumerThreadIdsPerTopic$2.apply(TopicCount.scala:34) 
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772) 
    at scala.collection.immutable.Map$Map1.foreach(Map.scala:109) 
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771) 
    at kafka.consumer.TopicCount$class.makeConsumerThreadIdsPerTopic(TopicCount.scala:34) 
    at kafka.consumer.StaticTopicCount.makeConsumerThreadIdsPerTopic(TopicCount.scala:100) 
    at kafka.consumer.StaticTopicCount.getConsumerThreadIdsPerTopic(TopicCount.scala:104) 
    at kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:198) 
    at kafka.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:138) 
    at org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:111) 
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:148) 
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:130) 
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:542) 
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:532) 
    at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1986) 
    at org.apache.spark.SparkContext$$anonfun$37.apply(SparkContext.scala:1986) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 

INFO : org.apache.spark.streaming.receiver.ReceiverSupervisorImpl - Stopped receiver 0 
INFO : org.apache.spark.streaming.receiver.BlockGenerator - Stopping BlockGenerator 
INFO : org.apache.spark.streaming.util.RecurringTimer - Stopped timer for BlockGenerator after time 1473964037200 
INFO : org.apache.spark.streaming.receiver.BlockGenerator - Waiting for block pushing thread to terminate 
INFO : org.apache.spark.streaming.receiver.BlockGenerator - Pushing out the last 0 blocks 
INFO : org.apache.spark.streaming.receiver.BlockGenerator - Stopped block pushing thread 
INFO : org.apache.spark.streaming.receiver.BlockGenerator - Stopped BlockGenerator 
INFO : org.apache.spark.streaming.receiver.ReceiverSupervisorImpl - Waiting for receiver to be stopped 
ERROR: org.apache.spark.streaming.receiver.ReceiverSupervisorImpl - Stopped receiver with error: java.lang.AssertionError: assertion failed 
ERROR: org.apache.spark.executor.Executor - Exception in task 0.0 in stage 29.0 

ответ

0

Существует одна проблема с вышеупомянутым кодом. Параметр kafkaTopicMap в KafkaUtils.createStream способом указать Map of (topic_name -> numPartitions) to consume. Each partition is consumed in its own thread.

Попробуйте следующий код:

JavaPairDStream<String, String> messages = null; 
int sparkStreamCount = 5; 
Map<String, Integer> kafkaTopicMap = new HashMap<String, Integer>(); 
if (sparkStreamCount > 0) { 

    List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<JavaPairDStream<String, String>>(sparkStreamCount); 
    for (int i = 0; i < sparkStreamCount; i++) { 
     kafkaTopicMap.put(topic, i+1); 
     kafkaStreams.add(KafkaUtils.createStream(streamingContext, contextVal.getString(KAFKA_ZOOKEEPER), contextVal.getString(KAFKA_GROUP_ID), kafkaTopicMap)); 
    } 

    messages = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size())); 

} else { 
    messages = KafkaUtils.createStream(streamingContext, contextVal.getString(KAFKA_ZOOKEEPER), contextVal.getString(KAFKA_GROUP_ID), kafkaTopicMap); 
} 
+0

Спасибо так много Hokam для вас ответ. Я внес изменения, которые вы рекомендовали, но теперь он не обрабатывает ничего: просто указывая следующее сообщение: – Alchemist

+0

INFO: org.apache.spark.streaming.receiver.BlockGenerator - Вдвинутый вход блока-2-1473954491600 INFO: org.apache.spark .storage.MemoryStore - обеспечитьFreeSpace (213), вызываемый с curMem = 487780, maxMem = 556038881 INFO: org.apache.spark.storage.MemoryStore - Блок ввода-2-1473954496800, хранящийся в виде байтов в памяти (примерный размер 213,0 B, бесплатно 529,8 MB) INFO: org.apache.spark.storage.BlockManagerInfo - Добавлен вход-2-1473954496800 в память на localhost: 53678 (размер: 213.0 B, бесплатно: 530,2 МБ) WARN: org.apache.spark.storage.BlockManager - Вход блока - 2-1473954496800, реплицированный только 0 равным – Alchemist

+0

Карта kafkaTopicMap = new HashMap (); kafkaTopicMap.put (contextVal.getString (KAFKA_HPD_TOPICS), sparkStreamCount); Список > kafkaStreams = new ArrayList > (sparkStreamCount); \t для (INT I = 0; я Alchemist