2016-12-27 5 views
3

У меня есть ошибка при попытке скомпилировать, протестировать и запустить junit-тест.О java.lang.NoClassDefFoundError: Не удалось инициализировать класс org.xerial.snappy.Snappy

Я хочу, чтобы загрузить локальный Avro файл с помощью DataFrames, но я получаю исключение:

org.xerial.snappy.SnappyError: [FAILED_TO_LOAD_NATIVE_LIBRARY] null 

Я не использую Кассандру вообще, версия вовлеченных банки являются:

<properties> 
    <!-- Generic properties --> 
    <java.version>1.7</java.version> 
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> 
    <!-- Dependency versions --> 
    <maven.compiler.source>1.7</maven.compiler.source> 
    <maven.compiler.target>1.7</maven.compiler.target> 
    <scala.version>2.10.4</scala.version> 
    <junit.version>4.11</junit.version> 
    <slf4j.version>1.7.12</slf4j.version> 
    <spark.version>1.5.0-cdh5.5.2</spark.version> 
    <databricks.version>1.5.0</databricks.version> 
    <json4s-native.version>3.5.0</json4s-native.version> 
    <spark-avro.version>2.0.1</spark-avro.version> 
</properties> 

и это зависимости:

<dependencies> 

    <dependency> 
     <groupId>org.json4s</groupId> 
     <artifactId>json4s-native_2.10</artifactId> 
     <version>${json4s-native.version}</version> 
    </dependency> 
    <dependency> 
     <groupId>junit</groupId> 
     <artifactId>junit</artifactId> 
     <version>${junit.version}</version> 
     <scope>test</scope> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-sql_2.10</artifactId> 
     <version>${spark.version}</version> 
    </dependency> 
    <dependency> 
     <groupId>com.databricks</groupId> 
     <artifactId>spark-csv_2.10</artifactId> 
     <version>${databricks.version}</version> 
     <exclusions> 
      <exclusion> 
       <groupId>org.xerial.snappy</groupId> 
       <artifactId>snappy-java</artifactId> 
      </exclusion> 
     </exclusions> 
    </dependency> 

    <dependency> 
     <groupId>org.xerial.snappy</groupId> 
     <artifactId>snappy-java</artifactId> 
     <version>1.0.4.1</version> 
     <scope>compile</scope> 
    </dependency> 
    <dependency> 
     <groupId>com.databricks</groupId> 
     <artifactId>spark-avro_2.10</artifactId> 
     <version>${spark-avro.version}</version> 
    </dependency> 
    <!-- https://mvnrepository.com/artifact/log4j/log4j --> 
    <dependency> 
     <groupId>log4j</groupId> 
     <artifactId>log4j</artifactId> 
     <version>1.2.17</version> 
    </dependency> 
</dependencies> 

Я попытался скомпилировать проект с помощью

mvn clean install -Dorg.xerial.snappy.lib.name=libsnappyjava.jnlib -Dorg.xerial.snappy.tempdir=/tmp

перед копированием банки в течение/TMP, без удачи.

$ ls -lt /tmp/ 
total 1944 
...27 dic 13:01 snappy-java-1.0.4.jar 

Это код:

import org.apache.spark.rdd.RDD 
import org.apache.spark.sql.{DataFrame, Row, SQLContext, SaveMode} 
import org.apache.spark.{SparkConf, SparkContext} 
import com.databricks.spark.avro._ 
import java.io._ 

//auxiliary function 
def readRawData(pathToResources: String, sqlContext: SQLContext, rawFormat: String = "json"): DataFrame = { 
val a: DataFrame = rawFormat match { 
    case "avro" => sqlContext.read.avro(pathToResources) 
    case "json" => sqlContext.read.json(pathToResources) 
    case _ => throw new Exception("Format not supported, use AVRO or JSON instead.") 
} 
val b: DataFrame = a.filter("extraData.type = 'data'") 
val c: DataFrame = a.select("extraData.topic", "extraData.timestamp", 
    "extraData.sha1Hex", "extraData.filePath", "extraData.fileName", 
    "extraData.lineNumber", "extraData.type", 
    "message") 

val indexForMessage: Int = c.schema.fieldIndex("message") 
val result: RDD[Row] = c.rdd.filter(r => 
    !r.anyNull match { 
    case true => true 
    case false => false 
    } 
).flatMap(r => { 
    val metadata: String = r.toSeq.slice(0, indexForMessage).mkString(",") 
    val lines = r.getString(indexForMessage).split("\n") 
    lines.map(l => Row.fromSeq(metadata.split(",").toSeq ++ Seq(l))) 
}) 
sqlContext.createDataFrame(result, c.schema) 
}//readRawData 


def validate(rawFlumeData : String = "FlumeData.1482407196579",fileNamesToBeDigested : String = "fileNames-to-be-digested.txt", sqlContext: SQLContext,sc:SparkContext) : Boolean = { 

val result : Boolean = true 

    sqlContext.sparkContext.hadoopConfiguration.set("avro.mapred.ignore.inputs.without.extension", "false") 

val rawDF : DataFrame = readRawData(rawFlumeData, sqlContext, rawFormat = "avro") 

rawDF.registerTempTable("RAW") 
//this line provokes the exception! cannot load snappy jar file! 
val arrayRows : Array[org.apache.spark.sql.Row] = sqlContext.sql("SELECT distinct fileName as filenames FROM RAW GROUP BY fileName").collect() 

val arrayFileNames : Array[String] = arrayRows.map(row=>row.getString(0)) 

val fileNamesDigested = "fileNames-AVRO-1482407196579.txt" 

val pw = new PrintWriter(new File(fileNamesDigested)) 

for (filename <-arrayFileNames) pw.write(filename + "\n") 

pw.close 

val searchListToBeDigested : org.apache.spark.rdd.RDD[String] = sc.textFile(fileNamesToBeDigested) 

//creo un map con valores como éstos: Map(EUR_BACK_SWVOL_SMILE_GBP_20160930.csv -> 0, UK_SC_equities_20160930.csv -> 14,... 
//val mapFileNamesToBeDigested: Map[String, Long] = searchListToBeDigested.zipWithUniqueId().collect().toMap 

val searchFilesAVRODigested = sc.textFile(fileNamesDigested) 

val mapFileNamesAVRODigested: Map[String, Long] = searchFilesAVRODigested.zipWithUniqueId().collect().toMap 

val pwResults = new PrintWriter(new File("validation-results.txt")) 

//Hay que guardar el resultado en un fichero de texto, en algún lado... 
val buffer = StringBuilder.newBuilder 

//Me traigo los resultados al Driver. 
val listFilesToBeDigested = searchListToBeDigested.map {line => 
    val resultTemp = mapFileNamesAVRODigested.getOrElse(line,"NOT INGESTED!") 
    var resul = "" 
    if (resultTemp == "NOT INGESTED!"){ 
    resul = "File " + line + " " + resultTemp + "\n" 
    } 
    else{ 
    resul = "File " + line + " " + " is INGESTED!" + "\n" 
    } 
    resul 
}.collect() 

//añado los datos al buffer 
listFilesToBeDigested.foreach(buffer.append(_)) 
//guardo el contenido del buffer en el fichero de texto de salida. 
pwResults.write(buffer.toString) 
pwResults.close 
//this boolean must return false in case of a exception or error... 
result 
}// 

Это устройство тестовый код:

private[validation] class ValidateInputCSVFilesTest { 

//AS YOU CAN SEE, I do not WANT to use snappy at all! 
val conf = new SparkConf() 
.setAppName("ValidateInputCSVFilesTest") 
.setMaster("local[2]") 
.set("spark.driver.allowMultipleContexts", "true") 
.set("spark.driver.host", "127.0.0.1") 
.set("spark.io.compression.codec", "lzf") 

val sc = new SparkContext(conf) 
val sqlContext = new org.apache.spark.sql.SQLContext(sc) 

val properties : Properties = new Properties() 
properties.setProperty("frtb.input.csv.validation.avro","./src/test/resources/avro/FlumeData.1482407196579") 
properties.setProperty("frtb.input.csv.validation.list.files","./src/test/resources/fileNames-to-be-digested.txt") 
import sqlContext.implicits._ 

sqlContext.sparkContext.hadoopConfiguration.set("avro.mapred.ignore.inputs.without.extension", "false") 

@Test 
def testValidateInputFiles() = { 

//def validate(rawFlumeData : String = "FlumeData.1482407196579",fileNamesToBeDigested : String = "fileNames-to-be-digested.txt", sqlContext: SQLContext) 
val rawFlumeData = properties.getProperty("frtb.input.csv.validation.avro") 
val fileNamesToBeDigested = properties.getProperty("frtb.input.csv.validation.list.files") 
println("rawFlumeData is " + rawFlumeData) 
println("fileNamesToBeDigested is " + fileNamesToBeDigested) 
val result : Boolean = ValidateInputCSVFiles.validate(rawFlumeData ,fileNamesToBeDigested ,sqlContext,sc) 

Assert.assertTrue("Must be true...",result) 

}//end of test method 

}//end of unit class 

Я могу работать отлично тот же код в локальной искровой оболочке, используя эту команду :

$ bin/spark-shell --packages org.json4s:json4s-native_2.10:3.5.0 --packages com.databricks:spark-csv_2.10:1.5.0 --packages com.databricks:spark-avro_2.10:2.0.1 

Что еще я могу делать?

Заранее спасибо.

ответ

0

Проблема была решена, когда я изменил сферу искровых зависимостей.

Это часть pom.xml, которая решает мою проблему, теперь я могу запустить работу с искровым подать команду ...

<properties> 
    <!-- Generic properties --> 
    <java.version>1.7</java.version> 
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> 
    <!-- Dependency versions --> 
    <maven.compiler.source>1.7</maven.compiler.source> 
    <maven.compiler.target>1.7</maven.compiler.target> 
    <scala.version>2.10.4</scala.version> 
    <junit.version>4.11</junit.version> 
    <slf4j.version>1.7.12</slf4j.version> 
    <spark.version>1.5.0-cdh5.5.2</spark.version> 
    <databricks.version>1.5.0</databricks.version> 
    <json4s-native.version>3.5.0</json4s-native.version> 
    <spark-avro.version>2.0.1</spark-avro.version> 
</properties> 

...

<dependencies> 

    <dependency> 
     <groupId>org.json4s</groupId> 
     <artifactId>json4s-native_2.10</artifactId> 
     <version>${json4s-native.version}</version> 
    </dependency> 
    <dependency> 
     <groupId>junit</groupId> 
     <artifactId>junit</artifactId> 
     <version>${junit.version}</version> 
     <scope>test</scope> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.spark</groupId> 
     <artifactId>spark-sql_2.10</artifactId> 
     <version>${spark.version}</version> 
     <scope>provided</scope> 
    </dependency> 
    <dependency> 
     <groupId>com.databricks</groupId> 
     <artifactId>spark-csv_2.10</artifactId> 
     <version>${databricks.version}</version> 
     <scope>provided</scope> 
     <exclusions> 
      <exclusion> 
       <groupId>org.xerial.snappy</groupId> 
       <artifactId>snappy-java</artifactId> 
      </exclusion> 
     </exclusions> 
    </dependency> 

    <dependency> 
     <groupId>org.xerial.snappy</groupId> 
     <artifactId>snappy-java</artifactId> 
     <version>1.0.4.1</version> 
     <scope>provided</scope> 
    </dependency> 
    <dependency> 
     <groupId>com.databricks</groupId> 
     <artifactId>spark-avro_2.10</artifactId> 
     <version>${spark-avro.version}</version> 
    </dependency> 
    <!-- https://mvnrepository.com/artifact/log4j/log4j --> 
    <dependency> 
     <groupId>log4j</groupId> 
     <artifactId>log4j</artifactId> 
     <version>1.2.17</version> 
    </dependency> 
</dependencies> 

.. .