1

Я запускаю готовый EMR-кластер, используя Spark 1.6.0 и Zeppelin 0.5.6 on AWS. Моя цель - инициализировать простой контекст Spark Streaming и указать на внутренний поток Kinesis, как на доказательство концепции. Однако, когда я запускаю его я получаю:Spark Streaming 1.6.0 EMR с использованием Python: ClassNotFoundException: org.apache.spark.streaming.kinesis.KinesisUtilsPythonHelper

Py4JJavaError: An error occurred while calling o89.loadClass. : 
java.lang.ClassNotFoundException: org.apache.spark.streaming.kinesis.KinesisUtilsPythonHelper 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366) 
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425) 
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381) 
    at py4j.Gateway.invoke(Gateway.java:259) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:209) 
    at java.lang.Thread.run(Thread.java:745) 

код я бегу (через Zeppelin) просто:

%pyspark 
from pyspark.streaming import StreamingContext 
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream 

ssc = StreamingContext(sc, 1) 

appName = '{my-app-name}' 
streamName = '{my-stream-name}' 
endpointUrl = '{my-endpoint}' 
regionName = '{my-region}' 

lines = KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName, InitialPositionInStream.LATEST, 2) 

Когда я наткнулся на это на местном уровне, я уверен построить искровым потоковый-KINESIS-ASL от источника и включают в себя эти банки в моей искровой конфигурации:

spark.driver.extraClassPath /path/to/kinesis/asl/assembly/jars/* 

Однако, я не могу показаться, чтобы получить эту работу, когда на ОМ. Чтобы быть в безопасности, я включаю его в следующее, безрезультатно:

spark.driver.extraClassPath 
spark.driver.extraLibraryPath 
spark.executor.extraClassPath 
spark.executor.extraLibraryPath 

Кто-нибудь сталкивался с этим раньше? Я распечатываю конфигурацию искры, когда я перезапускаю контекст, чтобы подтвердить, что эти изменения подбираются. Может быть, это нужно делать и на подчиненных узлах? Или, возможно, еще один вариант/ключ конфигурации?

ответ

2

Добавить зависимость от zeppelin context "z". Heres пример добавления пакета sparkcsv

%dep 
z.load("com.databricks:spark-csv_2.11:1.3.0") 
+0

Это сделало трюк! Для всех, кто ищет дополнительную информацию, вы можете найти полную документацию Zeppelin [здесь] (https://zeppelin.incubator.apache.org/docs/0.5.5-incubating/interpreter/spark.html) –