2016-06-14 4 views
1

Я искал среднего рода, используя искру и нашел это решение:Как выполнить Secondary Sort in Spark?

case class RFMCKey(cId: String, R: Double, F: Double, M: Double, C: Double) 
class RFMCPartitioner(partitions: Int) extends Partitioner { 
    require(partitions >= 0, "Number of partitions ($partitions) cannot be negative.") 
    override def numPartitions: Int = partitions 
    override def getPartition(key: Any): Int = { 
    val k = key.asInstanceOf[RFMCKey] 
    k.cId.hashCode() % numPartitions 
    } 
} 
object RFMCKey { 
    implicit def orderingBycId[A <: RFMCKey] : Ordering[A] = { 
    Ordering.by(k => (k.R, k.F * -1, k.M * -1, k.C * -1)) 
    } 
} 

Теперь это код, который я использую для (Давность, частота, валютный, Clumpiness) программы моей RFMC. В том же коде, в конце концов, я делаю:

val rfmcTableSorted = rfmcTable.repartitionAndSortWithinPartitions(new RFMCPartitioner(1)) 

Но когда я загрузить этот файл в spark-shell, я получаю следующее сообщение об ошибке:

<console>:130: error: RFMCKey is already defined as (compiler-generated) case class companion object RFMCKey 
      object RFMCKey { 
        ^
<console>:198: error: RFMCKey.type does not take parameters 
           case (custId, (((rVal, fVal), mVal),cVal)) => (RFMCKey(custId, rVal, fVal, mVal, cVal), rVal+","+fVal+","+mVal+","+cVal) 
                              ^
<console>:200: error: value repartitionAndSortWithinPartitions is not a member of org.apache.spark.rdd.RDD[Nothing] 
val rfmcTableSorted = rfmcTable.repartitionAndSortWithinPartitions(new RFMCPartitioner(1)).cache() 

Как обойти эту проблему?

Update 1

Я попытался изменить порядок декларирования моего дела класса и класса объекта и удивительно оболочки загружен файл, не бросать какие-либо ошибки. Но когда я запустил свою программу, он бросил новую ошибку:

org.apache.spark.SparkException: Task not serializable 
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) 
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) 
at org.apache.spark.SparkContext.clean(SparkContext.scala:1623) 
at org.apache.spark.rdd.RDD.map(RDD.scala:286) 
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$rfmc$.constructRFMC(<console>:113) 
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:36) 
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:41) 
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:43) 
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:45) 
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:47) 
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:49) 
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:51) 
at $iwC$$iwC$$iwC.<init>(<console>:53) 
at $iwC$$iwC.<init>(<console>:55) 
at $iwC.<init>(<console>:57) 
at <init>(<console>:59) 
at .<init>(<console>:63) 
at .<clinit>(<console>) 
at .<init>(<console>:7) 
at .<clinit>(<console>) 
at $print(<console>) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:606) 
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) 
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338) 
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) 
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) 
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) 
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:856) 
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901) 
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813) 
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:656) 
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:664) 
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669) 
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:996) 
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944) 
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944) 
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) 
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944) 
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058) 
at org.apache.spark.repl.Main$.main(Main.scala:31) 
at org.apache.spark.repl.Main.main(Main.scala) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:606) 
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) 
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) 
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) 
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) 
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.io.NotSerializableException: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$rfmc$ 
Serialization stack: 
    - object not serializable (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$rfmc$, value: [email protected]) 
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$rfmc$$anonfun$17, name: $outer, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$rfmc$) 
    - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$rfmc$$anonfun$17, <function1>) 
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38) 
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) 
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80) 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) 
    ... 52 more 

Update 2

Путь я определяю свои объекты и функции, как это:

object rfmc { 
    def constructrfmc() = { 
    // Everything goes inside including the custom key and partitioner 
    // code defined above 
    } 
} 

Update 3

Способ, которым я определяю свой код в eclipse, который отлично работает:

object rfmc extends App { 
    // Everything goes inside including the custom key and partitioner 
    // code defined above 
} 

Я также создал JAR для этого кода и побежал с помощью spark-submit и, что тоже работал отлично.

+0

Возможно, ваша проблема связана с этим: https://issues.scala-lang.org/browse/SI-3772. Возможно, попробуйте пообщаться с порядком объявления вашего класса case и вашего объекта. –

+0

У меня нет времени, чтобы ответить, но вы взглянули на это: http://codingjunkie.net/spark-secondary-sort/ – Vale

+0

@Vale Я на самом деле сделал то же самое. –

ответ

1

Чтобы решить проблему, которую уже определили RFMCKey, вам необходимо поменять порядок вашего класса класса и объявления объекта, как описано в this issue.

Что касается ваших обновлений, могут быть некоторые ограничения в spark-shell, которые не могут позволить выполнять произвольный код (например, с аккумуляторами). Чтобы получить больше информации о механизме сериализации, вы должны передать следующий параметр -Dsun.io.serialization.extendedDebugInfo=true. Помните, что искровая оболочка - скорее исследовательская утилита для тестирования небольших фрагментов кода или новых функций итеративно благодаря REPL, а не полноценной готовой к выпуску утилите, которая должна широко использоваться для тестирования вашего кода.

Ваш безопасный вариант заключается в том, чтобы упаковать ваше приложение в банку и установить Spark в standalone mode и запустить spark-submit с вашей упакованной банкой. Как показано в обновлении 3 и 4 вашего сообщения, вам необходимо обновить свой код, чтобы обернуть его в объект, чтобы он стал точкой входа вашей работы. Это позволит вам убедиться, что ваш код здесь не виноват.