2015-03-19 3 views
0

Я пытаюсь интегрировать Apache Spark Streaming и Apache Flume, следуя this guide. Я пытаюсь установить это в песочнице MapR.Apache Spark Streaming и интеграция пакетов Apache

Когда я отправлю пример: JavaFlumeEventCount, все работает нормально, и оно учитывает все события. Я использую один терминал для запуска задания Spark и другого терминала для запуска Flume.

Когда я пытаюсь использовать пример код в моем собственном проекте и создать баночку он работает нормально, но события не подсчитываются и генерирует следующее исключение в журнале лоток:

19 Mar 2015 08:32:46,397 ERROR [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.SinkRunner$PollingRunner.run:160) - Unable to deliver event. Exception follows. 
org.apache.flume.EventDeliveryException: Failed to send events 
    at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:392) 
    at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68) 
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: localhost, port: 8002 }: Failed to send batch 
    at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:311) 
    at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:376) 
    ... 3 more 
Caused by: org.apache.flume.EventDeliveryException: NettyAvroRpcClient { host: localhost, port: 8002 }: Exception thrown from remote handler 
    at org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:393) 
    at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:370) 
    at org.apache.flume.api.NettyAvroRpcClient.appendBatch(NettyAvroRpcClient.java:299) 
    ... 4 more 
Caused by: java.util.concurrent.ExecutionException: org.apache.avro.AvroRuntimeException: org.apache.avro.AvroRuntimeException: Unknown datum type: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to org.apache.flume.source.avro.AvroFlumeEvent 
    at org.apache.avro.ipc.CallFuture.get(CallFuture.java:128) 
    at org.apache.flume.api.NettyAvroRpcClient.waitForStatusOK(NettyAvroRpcClient.java:385) 
    ... 6 more 
Caused by: org.apache.avro.AvroRuntimeException: org.apache.avro.AvroRuntimeException: Unknown datum type: java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record cannot be cast to org.apache.flume.source.avro.AvroFlumeEvent 
    at org.apache.avro.ipc.specific.SpecificRequestor.readError(SpecificRequestor.java:126) 
    at org.apache.avro.ipc.Requestor$Response.getResponse(Requestor.java:554) 
    at org.apache.avro.ipc.Requestor$TransceiverCallback.handleResult(Requestor.java:359) 
    at org.apache.avro.ipc.Requestor$TransceiverCallback.handleResult(Requestor.java:322) 
    at org.apache.avro.ipc.NettyTransceiver$NettyClientAvroHandler.messageReceived(NettyTransceiver.java:517) 
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) 
    at org.apache.avro.ipc.NettyTransceiver$NettyClientAvroHandler.handleUpstream(NettyTransceiver.java:499) 
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:558) 
    at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:786) 
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296) 
    at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:458) 
    at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:439) 
    at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303) 
    at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70) 
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:558) 
    at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:553) 
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268) 
    at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255) 
    at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:84) 
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:471) 
    at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:332) 
    at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:35) 
    at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:102) 
    at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 

своего проект имеет следующий pom.xml:

<?xml version="1.0" encoding="UTF-8"?> 
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 
    <modelVersion>4.0.0</modelVersion> 
    <groupId>com.example</groupId> 
    <artifactId>SparkStreamingExample</artifactId> 
    <version>1.0</version> 
    <packaging>jar</packaging> 
    <properties> 
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 
     <maven.compiler.source>1.7</maven.compiler.source> 
     <maven.compiler.target>1.7</maven.compiler.target> 
    </properties> 
    <dependencies> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-streaming_2.10</artifactId> 
      <version>1.2.1</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-streaming-flume_2.10</artifactId> 
      <version>1.2.1</version> 
     </dependency> 
    </dependencies> 
    <build> 
     <plugins> 
      <plugin> 
       <groupId>org.apache.maven.plugins</groupId> 
       <artifactId>maven-shade-plugin</artifactId> 
       <version>2.3</version> 
       <executions> 
        <execution> 
         <phase>package</phase> 
         <goals> 
          <goal>shade</goal> 
         </goals> 
         <configuration> 
          <artifactSet> 
           <excludes> 
            <exclude>org.apache.spark:spark-streaming_2.10</exclude> 
            <exclude>org.apache.spark:spark-core_2.10</exclude> 
           </excludes> 
          </artifactSet> 
          <filters> 
           <filter> 
            <artifact>*:*</artifact> 
            <excludes> 
             <exclude>META-INF/*.SF</exclude> 
             <exclude>META-INF/*.DSA</exclude> 
             <exclude>META-INF/*.RSA</exclude> 
            </excludes> 
           </filter> 
          </filters> 
         </configuration> 
        </execution> 
       </executions> 
      </plugin> 
     </plugins> 
    </build> 
</project> 

Почему это не работает?

ответ

0

Мне удалось исправить это, добавив следующие зависимости к pom.xml. Мой проект использовал более старую версию. (1.7.3)

<dependency> 
    <groupId>org.apache.avro</groupId> 
    <artifactId>avro-ipc</artifactId> 
    <version>1.7.7</version> 
</dependency> 
<dependency> 
    <groupId>org.apache.avro</groupId> 
    <artifactId>avro</artifactId> 
    <version>1.7.7</version> 
</dependency>