У меня проблема при записи данных в mongo после чтения и отображения данных.Невозможно написать mongodb после сопоставления в Spark Scala
Это сценарий, который я использую для запуска программы.
Я использую Spark 1.4.0
, Scala 2.11.7
и mongo 2.6.10
#!/usr/bin/env bash
SPARK_PATH="/Users/username/spark-1.4.0-bin-hadoop2.6/bin/spark-submit"
CLASS_NAME="com.knx.conversion.ScalaWordCount"
CLUSTER='local[2]'
JARS="/Users/username/spark-1.4.0-bin-hadoop2.6/lib/mongo-hadoop-core-1.4.0.jar,/Users/username/spark-1.4.0-bin-hadoop2.6/lib/mongo-java-driver-3.0.3.jar"
JAR="/Users/username/AggragateConversionFunnel/target/scala-2.11/aggragateconversionfunnel_2.11-1.0.jar"
PROJECT_PATH="/Users/username/AggragateConversionFunnel"
cd ${PROJECT_PATH} && sbt package
${SPARK_PATH} --class ${CLASS_NAME} --master ${CLUSTER} --jars ${JARS} $JAR
и здесь основная программа здесь. Просто скопируйте из [здесь] [1] и измените коллекцию выходных данных.
package com.knx.conversion
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.hadoop.conf.Configuration
import org.bson.BSONObject
import org.bson.BasicBSONObject
object ScalaWordCount {
def main(args: Array[String]) {
val sc = new SparkContext("local", "Scala Word Count")
val config = new Configuration()
config.set("mongo.input.uri", "mongodb://127.0.0.1:27017/first-week.interactions")
config.set("mongo.output.uri", "mongodb://127.0.0.1:27017/visit_06_2015.output")
val mongoRDD = sc.newAPIHadoopRDD(config, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject])
// Input contains tuples of (ObjectId, BSONObject)
// Output contains tuples of (null, BSONObject) - ObjectId will be generated by Mongo driver if null
val countsRDD = mongoRDD.flatMap(arg => {
val str = arg._2.get("referer").toString
str.split("h")
})
.map(word => (word, 1))
.reduceByKey((a, b) => a + b)
countsRDD.foreach(println)
val saveRDD = countsRDD.map((tuple) => {
val bson = new BasicBSONObject()
bson.put("word", tuple._1)
bson.put("count", tuple._2.toString)
(null, bson)
})
// Only MongoOutputFormat and config are relevant
saveRDD.saveAsNewAPIHadoopFile("file:///bogus", classOf[Any], classOf[Any], classOf[com.mongodb.hadoop.MongoOutputFormat[Any, Any]], config)
}
}
При запуске я получил ошибку
5/07/24 15:53:03 INFO DAGScheduler: Job 0 finished: foreach at ScalaWordCount.scala:39, took 1.111442 s
Exception in thread "main" java.lang.NoSuchMethodError: scala.Predef$.$conforms()Lscala/Predef$$less$colon$less;
at com.knx.conversion.ScalaWordCount$.main(ScalaWordCount.scala:48)
at com.knx.conversion.ScalaWordCount.main(ScalaWordCount.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
15/07/24 15:53:03 INFO SparkContext: Invoking stop() from shutdown hook
Просто не знаю, почему и как это произошло.
[1]: https://github.com/plaa/mongo-spark/blob/master/src/main/scala/ScalaWordCount.scala
Проверьте, что JAR, который вы отправляете Spark, может иметь несколько версий Scala libs, что может привести к использованию плохого. – cchantep