2017-02-21 45 views
0

Когда я пытаюсь выполнить проект искры kafka. Я получаю ниже ошибок:Исключение из потока «main» java.lang.NoClassDefFoundError: org/spark_project/guava/cache/CacheLoader

Exception in thread "main" java.lang.NoClassDefFoundError: org/spark_project/guava/cache/CacheLoader 
    at org.apache.spark.SparkConf.loadFromSystemProperties(SparkConf.scala:73) 
    at org.apache.spark.SparkConf.<init>(SparkConf.scala:68) 
    at org.apache.spark.SparkConf.<init>(SparkConf.scala:55) 

Я попытался ниже методов, которые уже задавали на форуме: 1) Добавлен

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
<modelVersion>4.0.0</modelVersion> 
<groupId>com.iot.app.kafka</groupId> 
<artifactId>iot-kafka-producer</artifactId> 
<version>1.0.0</version> 
<name>IoT Kafka Producer</name> 



<dependencies> 
    <dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka-clients</artifactId> 
     <version>0.9.0.0</version> 
    </dependency> 

    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-network-common_2.11</artifactId> 
     <version>1.6.1</version> 
    </dependency> 

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka_2.10 --> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-streaming-kafka_2.10</artifactId> 
     <version>1.6.3</version> 
    </dependency> 

    <!-- <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> 
     <version>2.1.0</version> </dependency> --> 
    <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client --> 
    <dependency> 
     <groupId>org.apache.hadoop</groupId> 
     <artifactId>hadoop-client</artifactId> 
     <version>2.2.0</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-core_2.11</artifactId> 
     <version>1.6.2</version> 
    </dependency> 

    <dependency> 
     <groupId>org.spark-project.spark</groupId> 
     <artifactId>unused</artifactId> 
     <version>1.0.0</version> 
     <scope>provided</scope> 
    </dependency> 

    <dependency> 
     <groupId>com.fasterxml.jackson.core</groupId> 
     <artifactId>jackson-core</artifactId> 
     <version>2.6.6</version> 
    </dependency> 
    <dependency> 
     <groupId>com.fasterxml.jackson.core</groupId> 
     <artifactId>jackson-databind</artifactId> 
     <version>2.6.6</version> 
    </dependency> 
    <dependency> 
     <groupId>com.fasterxml.jackson.core</groupId> 
     <artifactId>jackson-annotations</artifactId> 
     <version>2.6.6</version> 
    </dependency> 
    <dependency> 
     <groupId>log4j</groupId> 
     <artifactId>log4j</artifactId> 
     <version>1.2.17</version> 
    </dependency> 
    <dependency> 
     <groupId>junit</groupId> 
     <artifactId>junit</artifactId> 
     <version>4.12</version> 
    </dependency> 

    <dependency> 
     <groupId>com.google.guava</groupId> 
     <artifactId>guava</artifactId> 
     <version>19.0</version> 
    </dependency> 

</dependencies> 

Код: Spark Потребительского код

package datapipeline; 

import java.io.FileInputStream; 

import java.io.FileNotFoundException; 
import java.io.IOException; 
import java.io.InputStream; 
import java.util.ArrayList; 
import java.util.Arrays; 
import java.util.Collection; 
import java.util.Collections; 
import java.util.HashMap; 
import java.util.HashSet; 
import java.util.Map; 
import java.util.Properties; 
import java.util.Set; 
import java.util.regex.Pattern; 

import org.apache.kafka.clients.consumer.ConsumerRecord; 
import org.apache.kafka.clients.consumer.ConsumerRecords; 
import org.apache.kafka.clients.consumer.KafkaConsumer; 
import org.apache.spark.SparkConf; 
import org.apache.spark.SparkContext; 
import org.apache.spark.api.java.JavaSparkContext; 
import org.apache.spark.streaming.Duration; 
import org.apache.spark.streaming.api.java.JavaDStream; 
import org.apache.spark.streaming.api.java.JavaPairDStream; 
import org.apache.spark.streaming.api.java.JavaPairInputDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 
import org.apache.spark.streaming.kafka.KafkaUtils; 

import kafka.serializer.StringDecoder; 
import scala.Tuple2; 


public class CustomerKafkaConsumerThread { 
    String broker; 
    private static final Pattern SPACE = Pattern.compile(" "); 

    public void sparkKafkaConsumer(String topics,String broker){ 
     this.broker=broker; 
     SparkConf conf=new SparkConf().setAppName("CustomerKafkaConsumerThread").setMaster("local"); 
     JavaStreamingContext jssc=new JavaStreamingContext(conf, new Duration(2000)); 

     Map<String, String> kafkaParams=new HashMap<String, String>(); 
     kafkaParams.put("metadata.broker.list",broker); 
     Set<String> topicSet=Collections.singleton(topics); 

     // Create direct kafka stream with brokers and topics 
     JavaPairInputDStream<String, String> messages=KafkaUtils.createDirectStream(
     jssc, 
     String.class, 
     String.class, 
     StringDecoder.class, 
     StringDecoder.class, 
     kafkaParams, 
     topicSet); 

     JavaDStream<String> lines = messages.map(Tuple2::_2); 
     System.out.println("........." + lines); 
     JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(SPACE.split(x))); 
     JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<>(s, 1)) 
      .reduceByKey((i1, i2) -> i1 + i2); 
     wordCounts.print(); 

     // Start the computation 
     jssc.start(); 
     jssc.awaitTermination(); 


    } 
} 

2) Удалено jarfile google.guava из сборки p ат в затмении и снова добавлен в качестве внешней банки.

Но ниже два метода не помогли в моем случае.

Пожалуйста, кто-то мне помочь в решении этой issue.Thanks заранее

+0

Не могли бы вы предоставить дополнительные сведения о pom.xml? – semsorock

+0

Как вы запускали коды? Использование spark-submit или нет? – zsxwing

+0

Привет @zsxwing Спасибо за ответ. Нет, я не выполнил программу с помощью spark-submit. Я выполняю свою программу в eclipse run-> java-приложении. Код для потребителей Spark загружен для справки. – user3837415

ответ

0

Вы должны использовать ту же версию scala в своих зависимостей. Пожалуйста, попробуйте изменить spark-streaming-kafka_2.10 к spark-streaming-kafka_2.11:

<dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-streaming-kafka_2.11</artifactId> 
    <version>1.6.3</version> 
</dependency> 

И использовать ту же spark версию. Например 1.6.3:

<dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-network-common_2.11</artifactId> 
    <version>1.6.3</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-streaming-kafka_2.10</artifactId> 
    <version>1.6.3</version> 
</dependency>  
<dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-core_2.11</artifactId> 
    <version>1.6.3</version> 
</dependency> 
+0

Я внес изменения в файл pom и изменил также версию зависимых зависимостей версий. Но все же проблема видна :( – user3837415

+0

Основная причина - версия 'guava'.' Spark-core' use 'guava' version' 14.0.1' (как указано). Можете ли вы понизить версию 'guava'? – semsorock

+0

Спасибо много. Он работал после перехода на версию 2.11 – user3837415