2015-11-08 3 views
0

Я пытаюсь получить Спарк работать с Flume, водослива конфигурации ниже (конфигурация/путь к классам):Спарк с Flume

#Declare 
log.sources = src 
log.sinks = spark 
log.channels = chs 

#Define Source 

log.sources.src.type = exec 
log.sources.src.command = sh /home/user/shell/flume.sh 

#Define Sink 
log.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink 
log.sinks.spark.hostname = localhost 
log.sinks.spark.port = 9999 
log.sinks.spark.channel = chs 

#Define Channels 

log.channels.chs.type = memory 

#Tie Source and Sink to Channel 

log.sinks.snk.channel = chs 
log.sources.src.channels = chs 

$ Ls -lrt $ FLUME_CLASSPATH

-rw-г - г - 1 корень корень 7126372 18 марта 2014-библиотеки Scala-2.10.4.jar

-rw-р - r-- 1 корень корень 412739 6 апреля 2014 Commons-lang3-3.3.2.jar

-rw-r - r-- 1 корень root 86020 Sep 24 00:15 искрообразование-водоотвод _2.10-1.5.1.jar

-rw-р - r-- 1 корень корень 7126003 7 ноября 19:09-библиотека Scala-2.10.3.jar

-rw-r-- r-- 1 корень корень 82325 7 ноября 19:26 искровой потоковый желоб-sink_2.11-1.2.0.jar

$ желоба-нг агент -f simplelogger.conf -n войти

15/11/07 19:48:05 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting 
15/11/07 19:48:05 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:simplelogger.conf 
15/11/07 19:48:05 INFO conf.FlumeConfiguration: Processing:spark 
15/11/07 19:48:05 INFO conf.FlumeConfiguration: Processing:spark 
15/11/07 19:48:05 INFO conf.FlumeConfiguration: Processing:spark 
15/11/07 19:48:05 INFO conf.FlumeConfiguration: Processing:snk 
15/11/07 19:48:05 INFO conf.FlumeConfiguration: Processing:spark 
15/11/07 19:48:05 INFO conf.FlumeConfiguration: Added sinks: spark Agent: log 
15/11/07 19:48:05 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [log] 
15/11/07 19:48:05 INFO node.AbstractConfigurationProvider: Creating channels 
15/11/07 19:48:05 INFO channel.DefaultChannelFactory: Creating instance of channel chs type memory 
15/11/07 19:48:05 INFO node.AbstractConfigurationProvider: Created channel chs 
15/11/07 19:48:05 INFO source.DefaultSourceFactory: Creating instance of source src, type exec 
15/11/07 19:48:05 INFO sink.DefaultSinkFactory: Creating instance of sink: spark, type: org.apache.spark.streaming.flume.sink.SparkSink 
15/11/07 19:48:05 ERROR node.PollingPropertiesFileConfigurationProvider: Failed to start agent because dependencies were not found in classpath. Error follows. 
java.lang.NoClassDefFoundError: scala/Function1 
     at java.lang.Class.forName0(Native Method) 
     at java.lang.Class.forName(Class.java:190) 
     at org.apache.flume.sink.DefaultSinkFactory.getClass(DefaultSinkFactory.java:67) 
     at org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:41) 
     at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:415) 
     at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:103) 
     at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140) 
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) 
     at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) 
     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) 
     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
     at java.lang.Thread.run(Thread.java:744) 
Caused by: java.lang.ClassNotFoundException: scala.Function1 
     at java.net.URLClassLoader$1.run(URLClassLoader.java:366) 
     at java.net.URLClassLoader$1.run(URLClassLoader.java:355) 
     at java.security.AccessController.doPrivileged(Native Method) 
     at java.net.URLClassLoader.findClass(URLClassLoader.java:354) 
     at java.lang.ClassLoader.loadClass(ClassLoader.java:425) 
     at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) 
     at java.lang.ClassLoader.loadClass(ClassLoader.java:358) 
     ... 14 more 

Также есть папка plugins.d в pwd (где у меня есть flume conf)

plugins.d /:

plugins.d/искровой:

plugins.d/искровой/Библиотека:

-rw-р - r-- 1 rgopalk rgopalk 82325 7 ноября 19:31 искровой потоковый-flume- sink_2.11-1.2.0.jar

Любые указатели, пожалуйста?

PS: Многократная версия искрообразующей банки и библиотеки scala-library в flume_classpath не имеет никакого значения. Ошибка совпадает с одной версией

ответ

0

Я скопировал все файлы jar, перечисленные выше, в {FLUME_INSTALLATTION_DIR/libs. Я также скопировал {SPARK_HOME}/lib/spark-assembly на {FLUME_INSTALLATTION_DIR/libs и начал работу

SLF4J: Class path contains multiple SLF4J bindings. 
SLF4J: Found binding in [jar:file:/usr/lib/flume/lib/slf4j-log4j12-1.6.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] 
SLF4J: Found binding in [jar:file:/usr/lib/flume/lib/spark-assembly-1.5.1-hadoop2.4.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] 
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. 
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 
15/11/07 21:18:15 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting 
15/11/07 21:18:15 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:simplelogger.conf 
15/11/07 21:18:15 INFO conf.FlumeConfiguration: Processing:spark 
15/11/07 21:18:15 INFO conf.FlumeConfiguration: Processing:spark 
15/11/07 21:18:15 INFO conf.FlumeConfiguration: Processing:spark 
15/11/07 21:18:15 INFO conf.FlumeConfiguration: Processing:snk 
15/11/07 21:18:15 INFO conf.FlumeConfiguration: Processing:spark 
15/11/07 21:18:15 INFO conf.FlumeConfiguration: Added sinks: spark Agent: log 
15/11/07 21:18:15 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [log] 
15/11/07 21:18:15 INFO node.AbstractConfigurationProvider: Creating channels 
15/11/07 21:18:15 INFO channel.DefaultChannelFactory: Creating instance of channel chs type memory 
15/11/07 21:18:15 INFO node.AbstractConfigurationProvider: Created channel chs 
15/11/07 21:18:15 INFO source.DefaultSourceFactory: Creating instance of source src, type exec 
15/11/07 21:18:15 INFO sink.DefaultSinkFactory: Creating instance of sink: spark, type: org.apache.spark.streaming.flume.sink.SparkSink 
15/11/07 21:18:15 INFO sink.SparkSink: Configured Spark Sink with hostname: localhost, port: 9999, poolSize: 10, transactionTimeout: 60, backoffInterval: 200 
15/11/07 21:18:15 INFO node.AbstractConfigurationProvider: Channel chs connected to [src, spark] 
15/11/07 21:18:15 INFO node.Application: Starting new configuration:{ sourceRunners:{src=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource{name:src,state:IDLE} }} sinkRunners:{spark=SinkRunner: { policy:[email protected] counterGroup:{ name:null counters:{} } }} channels:{chs=org.apache.flume.channel.MemoryChannel{name: chs}} } 
15/11/07 21:18:15 INFO node.Application: Starting Channel chs 
15/11/07 21:18:15 INFO instrumentation.MonitoredCounterGroup: Monitoried counter group for type: CHANNEL, name: chs, registered successfully. 
15/11/07 21:18:15 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: chs started 
15/11/07 21:18:15 INFO node.Application: Starting Sink spark 
15/11/07 21:18:15 INFO sink.SparkSink: Starting Spark Sink: spark on port: 9999 and interface: localhost with pool size: 10 and transaction timeout: 60. 
15/11/07 21:18:15 INFO node.Application: Starting Source src 
15/11/07 21:18:15 INFO source.ExecSource: Exec source starting with command:sh /home/rgopalk/shell/flume.sh 
15/11/07 21:18:15 INFO instrumentation.MonitoredCounterGroup: Monitoried counter group for type: SOURCE, name: src, registered successfully. 
15/11/07 21:18:15 INFO instrumentation.MonitoredCounterGroup: Component type: SOURCE, name: src started 
15/11/07 21:18:16 INFO sink.SparkSink: Starting Avro server for sink: spark 
15/11/07 21:18:16 INFO sink.SparkSink: Blocking Sink Runner, sink will continue to run..