2015-04-15 7 views
3

Я сериализую модель Spark ML Pipeline, состоящую из нескольких TransformerS (org.apache.spark.ml.Transformer) и нескольких учащихся логистической регрессии (org.apache.spark.ml.classification.LogisticRegression). Все работает отлично на моей машине Windows, где я создал модель. Я сериализовал модель на диск с помощью java.io.ObjectOutputStream и прочитал ее обратно с помощью java.io.ObjectInputStream.Как десериализовать модель трубопровода в spark.ml?

Все работает отлично через sbt и мои соответствующие модульные тесты. Однако, когда я собираю свой код в банку и пытаюсь запустить тот же код в оболочке Spark на моем сервере, я получаю ClassNotFoundException для моего (первого) трансформатора. Трассировка стека приведена ниже.

Я использую fork: = true в файле build.sbt, поскольку я думал, что это может быть связано с SparkSQL MissingRequirementError when registering table, но это, похоже, не помогло.

Также не имеет значения, пытаюсь ли я загрузить модель из файла jar или непосредственно с диска (как показано ниже).

Как десериализировать трубопровод?

Welcome to Ubuntu 14.04.2 LTS (GNU/Linux 3.13.0-49-generic x86_64) 

spark-shell -J-Xmx4g --master local[2] --jars myJar.jar 
Spark assembly has been built with Hive, including Datanucleus jars on classpath 

Welcome to 
    ____    __ 
/__/__ ___ _____/ /__ 
_\ \/ _ \/ _ `/ __/ '_/ 
/___/ .__/\_,_/_/ /_/\_\ version 1.3.0 
    /_/ 

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_40) 
Type in expressions to have them evaluated. 
Type :help for more information. 
Spark context available as sc. 
SQL context available as sqlContext. 

scala> import com.myCompany.spark.classifier._ 
import com.myCompany.spark.classifier._ 
scala> import java.io._ 
import java.io._ 

scala> val l = new LabelTransformer(2) 
l: com.myCompany.spark.classifier.LabelTransformer = [email protected] 

scala> val l2 = new ObjectInputStream(new FileInputStream("log-reg_2_10.model")).readObject 
java.lang.ClassNotFoundException: com.myCompany.spark.classifier.LabelTransformer 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
    at java.lang.Class.forName0(Native Method) 
    at java.lang.Class.forName(Class.java:348) 
    at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:626) 
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) 
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1707) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1345) 
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) 
    at scala.collection.mutable.HashMap$$anonfun$readObject$1.apply(HashMap.scala:142) 
    at scala.collection.mutable.HashMap$$anonfun$readObject$1.apply(HashMap.scala:142) 
    at scala.collection.mutable.HashTable$class.init(HashTable.scala:105) 
    at scala.collection.mutable.HashMap.init(HashMap.scala:39) 
    at scala.collection.mutable.HashMap.readObject(HashMap.scala:142) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896) 

    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) 

    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) 

    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) 

    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) 
    at scala.collection.mutable.HashMap$$anonfun$readObject$1.apply(HashMap.scala:142) 
    at scala.collection.mutable.HashMap$$anonfun$readObject$1.apply(HashMap.scala:142) 
    at scala.collection.mutable.HashTable$class.init(HashTable.scala:105) 
    at scala.collection.mutable.HashMap.init(HashMap.scala:39) 
    at scala.collection.mutable.HashMap.readObject(HashMap.scala:142) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896) 

    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) 

    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) 

    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:25) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:32) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:36) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:38) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:40) 
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:42) 
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:44) 
    at $iwC$$iwC$$iwC.<init>(<console>:46) 
    at $iwC$$iwC.<init>(<console>:48) 
    at $iwC.<init>(<console>:50) 
    at <init>(<console>:52) 
    at .<init>(<console>:56) 
    at .<clinit>(<console>) 
    at .<init>(<console>:7) 
    at .<clinit>(<console>) 
    at $print(<console>) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) 
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1338) 
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) 
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) 
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) 
    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:856) 
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:901) 
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:813) 
    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:656) 
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:664) 
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:669) 
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:996) 
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944) 
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:944) 
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) 
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:944) 
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1058) 
    at org.apache.spark.repl.Main$.main(Main.scala:31) 
    at org.apache.spark.repl.Main.main(Main.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:569) 
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166) 
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 

ответ

1

С 1.6.0 вы можете использовать (экспериментальный) Pipeline.read.load(path) для загрузки Pipeline объекта из path.

1

Экспериментально это действительно так. Поскольку Pipeline (s) может содержать всевозможные вещи, каждый из них должен быть доступен для чтения/записи для сохранения/загрузки Pipeline. См. Например, RandomForests еще нет и не будет, пока Spark v2.0.0 https://issues.apache.org/jira/browse/SPARK-13784

 Смежные вопросы

  • Нет связанных вопросов^_^