Недавно мы перешли на HDP 2.5 с Kafka 0.10.0 и Spark 1.6.2. Поэтому я изменил свой pom и некоторые из API для работы с новым Kafka. Я могу запустить код, но я не вижу сообщений. Я добавил фрагмент кода ниже. Я также разместил своего пом. Я не уверен, что здесь происходит не так. Может кому-то помочь.Невозможно получить любые сообщения в Kafka 0.10.0 with Spark stream 1.6.2
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName(
"SparkApp");
JavaStreamingContext jssc = new JavaStreamingContext(conf,
Durations.seconds(2));
Map<String, Integer> topicMap = new HashMap<String, Integer>();
topicMap.put(this.topic, this.numThreads);
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", kfkBroker);
kafkaParams.put("zookeeper.connect", zkBroker);
kafkaParams.put("group.id", "default");
kafkaParams.put("fetch.message.max.bytes", "60000000");
JavaPairReceiverInputDStream<String, String> kafkaInStream = KafkaUtils.createStream(
jssc,
String.class,
String.class,
kafka.serializer.StringDecoder.class,
kafka.serializer.StringDecoder.class,
kafkaParams,
topicMap,
StorageLevel.MEMORY_AND_DISK());
kafkaInStream.foreachRDD(new VoidFunction<JavaPairRDD<String, String>>()
{
/**
*
*/
private static final long serialVersionUID = 1L;
public void call(JavaPairRDD<String, String> v1) throws Exception
{
System.out.println("inside call.. JavaPairRDD size " + v1.count());
for (Tuple2<String, String> test : v1.collect())
{
this.eventMessage.setMessage(test._2);
}
}
});
я получаю выход «внутренний вызов .. JavaPairRDD размер 0» всегда что указывает, что искра не читает никаких данных. Я попытался подтолкнуть некоторые данные к теме через консольный клиент. Но это не помогло.
Вот мой pom.xml (только зависимости добавлено)
<dependencies>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.1.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.10 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.10.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.6.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.6.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20160810</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.3</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.8</version>
</dependency>
</dependencies>
Спасибо. Какова рекомендуемая версия для Kafka 0.10 и искры 1.6.2? Репозиторий Maven показывает 1.6.2 для искрового потока. – AnswerSeeker
Вам не нужно добавлять клиента Kafka в свой pom.xml. 'spark-streaming-kafka_2.10' уже зависит от этого. – zsxwing
Спасибо за подсказку. Может быть, мой вопрос был неправильным. Какую версию искрообразования Кафку следует использовать в моем проекте для моего Kafka 0.10 и искры 1.6.2? – AnswerSeeker