2015-09-24 4 views
5

Я только начинаю флинк. Я написал следующий код и получил «выходов DATASOURCE вызвали ошибку: Не удалось прочитать код обертки пользователя» ОшибкаFlink: выходы DataSource вызвали ошибку: не удалось прочитать оболочку кода пользователя

Есть ли что я делаю неправильно?

версия: Flink v 0.9.1 (Hadoop 1) не используется Hadoop: Локальные выполнения оболочки: SCALA оболочки

Код:

val env = ExecutionEnvironment.getExecutionEnvironment 
val text = env.readTextFile("/home/ashish/Downloads/spark/synop.201501.csv" 
val data_split = text.flatMap{_.split(';')} 
data_split.first(3).print() 

Примечание: входной файл использует ';' в deliminator

Ошибка:

Scala-Flink> val data_split = text.flatMap{_.split(';')} 
data_split: org.apache.flink.api.scala.DataSet[String] = [email protected] 
Scala-Flink> data_split.first(3).print() 
09/24/2015 09:20:14 Job execution switched to status RUNNING. 
09/24/2015 09:20:14 CHAIN DataSource (at $line26.$read$$iw$$iw$$iw$$iw$$iw$$iw$.<init>(<console>:14) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at $line27.$read$$iw$$iw$$iw$$iw$$iw$$iw$.<init>(<console>:15))(1/1) switched to SCHEDULED 
09/24/2015 09:20:14 CHAIN DataSource (at $line26.$read$$iw$$iw$$iw$$iw$$iw$$iw$.<init>(<console>:14) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at $line27.$read$$iw$$iw$$iw$$iw$$iw$$iw$.<init>(<console>:15))(1/1) switched to DEPLOYING 
09/24/2015 09:20:14 CHAIN DataSource (at $line26.$read$$iw$$iw$$iw$$iw$$iw$$iw$.<init>(<console>:14) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at $line27.$read$$iw$$iw$$iw$$iw$$iw$$iw$.<init>(<console>:15))(1/1) switched to FAILED 
java.lang.Exception: Call to registerInputOutput() of invokable failed 
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:504) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.RuntimeException: The initialization of the DataSource's outputs caused an error: Could not read the user code wrapper: $line27.$read$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1 
    at org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:89) 
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:501) 
    ... 1 more 
Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not read the user code wrapper: $line27.$read$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1 
    at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:284) 
    at org.apache.flink.runtime.operators.RegularPactTask.instantiateUserCode(RegularPactTask.java:1507) 
    at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.setup(ChainedFlatMapDriver.java:39) 
    at org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:72) 
    at org.apache.flink.runtime.operators.RegularPactTask.initOutputs(RegularPactTask.java:1378) 
    at org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:290) 
    at org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:86) 
    ... 2 more 
Caused by: java.lang.ClassNotFoundException: $line27.$read$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
    at java.lang.Class.forName0(Native Method) 
    at java.lang.Class.forName(Class.java:348) 
    at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:71) 
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) 
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) 
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302) 
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264) 
    at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282) 
    ... 8 more 

09/24/2015 09:20:14 Job execution switched to status FAILING. 
09/24/2015 09:20:14 CHAIN GroupReduce (GroupReduce at org.apache.flink.api.scala.DataSet.first(DataSet.scala:707)) -> FlatMap (collect())(1/1) switched to CANCELED 
09/24/2015 09:20:14 DataSink (collect() sink)(1/1) switched to CANCELED 
org.apache.flink.runtime.client.JobExecutionException: Job execution failed. 
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) 
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) 
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) 
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) 
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:43) 
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) 
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) 
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) 
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465) 
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) 
    at akka.actor.ActorCell.invoke(ActorCell.scala:487) 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:221) 
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
Caused by: java.lang.Exception: Call to registerInputOutput() of invokable failed 
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:504) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.RuntimeException: The initialization of the DataSource's outputs caused an error: Could not read the user code wrapper: $anonfun$1 
    at org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:89) 
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:501) 
    ... 1 more 
Caused by: org.apache.flink.runtime.operators.util.CorruptConfigurationException: Could not read the user code wrapper: $anonfun$1 
    at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:284) 
    at org.apache.flink.runtime.operators.RegularPactTask.instantiateUserCode(RegularPactTask.java:1507) 
    at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.setup(ChainedFlatMapDriver.java:39) 
    at org.apache.flink.runtime.operators.chaining.ChainedDriver.setup(ChainedDriver.java:72) 
    at org.apache.flink.runtime.operators.RegularPactTask.initOutputs(RegularPactTask.java:1378) 
    at org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:290) 
    at org.apache.flink.runtime.operators.DataSourceTask.registerInputOutput(DataSourceTask.java:86) 
    ... 2 more 
Caused by: java.lang.ClassNotFoundException: $anonfun$1 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357) 
    at java.lang.Class.forName0(Native Method) 
    at java.lang.Class.forName(Class.java:348) 
    at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:71) 
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613) 
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) 
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) 
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:302) 
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:264) 
    at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:282) 
    ... 8 more 
+1

Немногие, чтобы помочь ответить на это: (1) Третью строку (получение новой среды исполнения) не следует удалять. Смешение различных сред, вероятно, вызовет проблемы (и может быть причиной вашей проблемы здесь, на самом деле). (2) Можете ли вы разместить полную трассировку стека исключений. Отсутствует корневая причина, она должна быть «вызвана» еще дальше в трассировке стека. (3) В вашем примере кода есть усеченные линии, можете ли вы опубликовать полные строки? –

+0

val env должен был быть первой строкой ... Мне жаль, что .val env = ExecutionEnvironment.getExecutionEnvironmentval text = env.readTextFile ("/ home/ashish/Downloads/spark/synop.201501.csv" val data_split = text.flatMap {_. split (';')} data_split.first (3) .print() – ashish

+0

Я обновил полный журнал ошибок – ashish

ответ

2

Проблема является утверждение "Вэл окр = ExecutionEnvironment.getExecutionEnvironment" в первой строке.

Scala Shell уже имеет ExecutionEnvironment, привязанную к переменной env, которая настроена для правильной загрузки классов, сгенерированных Shell.

Создав новую среду ExecutionEnvironment, вы переопределите эту предварительно сконфигурированную среду с тем, который неправильно настроен.

+0

Stephan, не могли бы вы указать ссылку на то, как должна быть программа flink, локально протестированная в repl? У меня такая же проблема с проектом flink 1.2, который развивается в среде emacs-ensime. 'env.readTextFile (" file:/d: /data/test.csv "). first (5) .print()' работает хорошо, однако 'env.rea dTextFile ("file:/d: /data/test.csv"). map (_. split ('')). first (5) .print() '- not. –

+0

И он корректно работает в windows cmd в консоли sbt. В ensime-inferior-scala 'org.apache.flink.runtime.operators.util.CorruptConfigurationException: Не удалось прочитать оболочку кода пользователя: $ anonfun $ 1'. –