Я запускаю готовый EMR-кластер, используя Spark 1.6.0 и Zeppelin 0.5.6 on AWS. Моя цель - инициализировать простой контекст Spark Streaming и указать на внутренний поток Kinesis, как на доказательство концепции. Однако, когда я запускаю его я получаю:Spark Streaming 1.6.0 EMR с использованием Python: ClassNotFoundException: org.apache.spark.streaming.kinesis.KinesisUtilsPythonHelper
Py4JJavaError: An error occurred while calling o89.loadClass. :
java.lang.ClassNotFoundException: org.apache.spark.streaming.kinesis.KinesisUtilsPythonHelper
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
код я бегу (через Zeppelin) просто:
%pyspark
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
ssc = StreamingContext(sc, 1)
appName = '{my-app-name}'
streamName = '{my-stream-name}'
endpointUrl = '{my-endpoint}'
regionName = '{my-region}'
lines = KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName, InitialPositionInStream.LATEST, 2)
Когда я наткнулся на это на местном уровне, я уверен построить искровым потоковый-KINESIS-ASL от источника и включают в себя эти банки в моей искровой конфигурации:
spark.driver.extraClassPath /path/to/kinesis/asl/assembly/jars/*
Однако, я не могу показаться, чтобы получить эту работу, когда на ОМ. Чтобы быть в безопасности, я включаю его в следующее, безрезультатно:
spark.driver.extraClassPath
spark.driver.extraLibraryPath
spark.executor.extraClassPath
spark.executor.extraLibraryPath
Кто-нибудь сталкивался с этим раньше? Я распечатываю конфигурацию искры, когда я перезапускаю контекст, чтобы подтвердить, что эти изменения подбираются. Может быть, это нужно делать и на подчиненных узлах? Или, возможно, еще один вариант/ключ конфигурации?
Это сделало трюк! Для всех, кто ищет дополнительную информацию, вы можете найти полную документацию Zeppelin [здесь] (https://zeppelin.incubator.apache.org/docs/0.5.5-incubating/interpreter/spark.html) –