2015-07-24 1 views
0

У меня проблема при записи данных в mongo после чтения и отображения данных.Невозможно написать mongodb после сопоставления в Spark Scala

Это сценарий, который я использую для запуска программы.

Я использую Spark 1.4.0, Scala 2.11.7 и mongo 2.6.10

#!/usr/bin/env bash 
    SPARK_PATH="/Users/username/spark-1.4.0-bin-hadoop2.6/bin/spark-submit" 
    CLASS_NAME="com.knx.conversion.ScalaWordCount" 
    CLUSTER='local[2]' 
    JARS="/Users/username/spark-1.4.0-bin-hadoop2.6/lib/mongo-hadoop-core-1.4.0.jar,/Users/username/spark-1.4.0-bin-hadoop2.6/lib/mongo-java-driver-3.0.3.jar" 
    JAR="/Users/username/AggragateConversionFunnel/target/scala-2.11/aggragateconversionfunnel_2.11-1.0.jar" 
    PROJECT_PATH="/Users/username/AggragateConversionFunnel" 
    cd ${PROJECT_PATH} && sbt package 
    ${SPARK_PATH} --class ${CLASS_NAME} --master ${CLUSTER} --jars ${JARS} $JAR 

и здесь основная программа здесь. Просто скопируйте из [здесь] [1] и измените коллекцию выходных данных.

package com.knx.conversion 

import org.apache.spark.SparkContext 
import org.apache.spark.SparkContext._ 
import org.apache.hadoop.conf.Configuration 
import org.bson.BSONObject 
import org.bson.BasicBSONObject 

object ScalaWordCount { 

    def main(args: Array[String]) { 

    val sc = new SparkContext("local", "Scala Word Count") 

    val config = new Configuration() 
    config.set("mongo.input.uri", "mongodb://127.0.0.1:27017/first-week.interactions") 
    config.set("mongo.output.uri", "mongodb://127.0.0.1:27017/visit_06_2015.output") 

    val mongoRDD = sc.newAPIHadoopRDD(config, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject]) 

    // Input contains tuples of (ObjectId, BSONObject) 
    // Output contains tuples of (null, BSONObject) - ObjectId will be generated by Mongo driver if null 
    val countsRDD = mongoRDD.flatMap(arg => { 
     val str = arg._2.get("referer").toString 
     str.split("h") 
    }) 
     .map(word => (word, 1)) 
     .reduceByKey((a, b) => a + b) 
    countsRDD.foreach(println) 

    val saveRDD = countsRDD.map((tuple) => { 
     val bson = new BasicBSONObject() 
     bson.put("word", tuple._1) 
     bson.put("count", tuple._2.toString) 
     (null, bson) 
    }) 
    // Only MongoOutputFormat and config are relevant 
    saveRDD.saveAsNewAPIHadoopFile("file:///bogus", classOf[Any], classOf[Any], classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]], config) 

    } 
} 

При запуске я получил ошибку

5/07/24 15:53:03 INFO DAGScheduler: Job 0 finished: foreach at ScalaWordCount.scala:39, took 1.111442 s 
Exception in thread "main" java.lang.NoSuchMethodError: scala.Predef$.$conforms()Lscala/Predef$$less$colon$less; 
    at com.knx.conversion.ScalaWordCount$.main(ScalaWordCount.scala:48) 
    at com.knx.conversion.ScalaWordCount.main(ScalaWordCount.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664) 
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169) 
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
15/07/24 15:53:03 INFO SparkContext: Invoking stop() from shutdown hook 

Просто не знаю, почему и как это произошло.

[1]: https://github.com/plaa/mongo-spark/blob/master/src/main/scala/ScalaWordCount.scala 
+1

Проверьте, что JAR, который вы отправляете Spark, может иметь несколько версий Scala libs, что может привести к использованию плохого. – cchantep

ответ

0

Это выдано около Scala version, что я в настоящее время использую не сопрягать с Спарк Scala версии. Я использую Scala 2.11.7, чтобы скомпилировать и упаковать банку, но Spark 1.4.1 использует Scala 2.10.4.

Ответ, который я нашел here.

Тогда эта проблема решается путем переключения версии Scala на 2.10.4.