0

Я пытаюсь получить доступ к API Twitter из моей программы Spark, написанной на Java. Я получаю следующую ошибку, когда я пытаюсь запустить свой код.NumberFormatException при попытке запустить JavaStreamingContext

Вызванный: java.lang.NumberFormatException: Не версия:

Мой код:

import java.util.Arrays; 
import java.util.Properties; 

import org.apache.spark.SparkConf; 
import org.apache.spark.api.java.function.FlatMapFunction; 
import org.apache.spark.api.java.function.Function; 
import org.apache.spark.streaming.Durations; 
import org.apache.spark.streaming.api.java.JavaDStream; 
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; 
import org.apache.spark.streaming.api.java.JavaStreamingContext; 
import org.apache.spark.streaming.twitter.*; 

import twitter4j.Status; 

public class JavaStreamingCount { 

    public static void main(String[] args) throws Exception { 

     SparkConf sc = new SparkConf() 
     .setAppName("JavaNetworkWordCount") 
     .setMaster("local[2]") 
     .setJars(new String[]{System.getProperty("user.dir")+"/target/spark-basics-0.0.1-SNAPSHOT-jar-with-dependencies.jar"}); 

     JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1)); 

     Properties props = new Properties(); 
     props.setProperty("oauth.consumerKey","<my-key>"); 
     props.setProperty("oauth.consumerSecret","<my-key>"); 
     props.setProperty("oauth.accessToken","<my-key>"); 
     props.setProperty("oauth.accessTokenSecret","<my-key>"); 

     System.setProperties(props); 

     JavaReceiverInputDStream<Status> stream = TwitterUtils.createStream(ssc); 

     JavaDStream<String> lines = stream.map(new Function<Status, String>() { 

      private static final long serialVersionUID = 1L; 

      @Override 
      public String call(Status v1) throws Exception { 
       return v1.getText(); 
      } 

     }); 

     JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { 
      private static final long serialVersionUID = 1L; 

      @Override 
       public Iterable<String> call(String x) { 
       return Arrays.asList(x.split(" ")); 
       } 
      }); 

     words.print(); 

     ssc.start(); 
     ssc.awaitTermination(); 


    } 

} 

Вот мой полный журнал

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 
15/06/17 17:56:35 INFO SparkContext: Running Spark version 1.4.0 
15/06/17 17:56:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 
15/06/17 17:56:40 INFO SecurityManager: Changing view acls to: Ravitej.Somayajula 
15/06/17 17:56:40 INFO SecurityManager: Changing modify acls to: Ravitej.Somayajula 
15/06/17 17:56:40 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(Ravitej.Somayajula); users with modify permissions: Set(Ravitej.Somayajula) 
15/06/17 17:56:41 INFO Slf4jLogger: Slf4jLogger started 
15/06/17 17:56:41 INFO Remoting: Starting remoting 
15/06/17 17:56:41 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:49952] 
15/06/17 17:56:41 INFO Utils: Successfully started service 'sparkDriver' on port 49952. 
15/06/17 17:56:41 INFO SparkEnv: Registering MapOutputTracker 
15/06/17 17:56:41 INFO SparkEnv: Registering BlockManagerMaster 
15/06/17 17:56:41 INFO DiskBlockManager: Created local directory at C:\Users\ravitej.somayajula\AppData\Local\Temp\spark-55bd21ca-e955-4509-8780-d0d1385d36e9\blockmgr-d3a2b9f4-19c8-4b11-a4bb-13c83dba4a92 
15/06/17 17:56:41 INFO MemoryStore: MemoryStore started with capacity 969.6 MB 
15/06/17 17:56:41 INFO HttpFileServer: HTTP File server directory is C:\Users\ravitej.somayajula\AppData\Local\Temp\spark-55bd21ca-e955-4509-8780-d0d1385d36e9\httpd-061c66c1-bcef-4e21-8eaa-7d034405400b 
15/06/17 17:56:41 INFO HttpServer: Starting HTTP Server 
15/06/17 17:56:42 INFO Utils: Successfully started service 'HTTP file server' on port 49953. 
15/06/17 17:56:42 INFO SparkEnv: Registering OutputCommitCoordinator 
15/06/17 17:56:42 INFO Utils: Successfully started service 'SparkUI' on port 4040. 
15/06/17 17:56:42 INFO SparkUI: Started SparkUI at http://10.25.170.67:4040 
15/06/17 17:56:43 INFO SparkContext: Added JAR F:\SparkWorkspace\spark-basics-java/target/spark-basics-0.0.1-SNAPSHOT-jar-with-dependencies.jar at http://10.25.170.67:49953/jars/spark-basics-0.0.1-SNAPSHOT-jar-with-dependencies.jar with timestamp 1434544003003 
15/06/17 17:56:43 INFO Executor: Starting executor ID driver on host localhost 
15/06/17 17:56:44 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 49972. 
15/06/17 17:56:44 INFO NettyBlockTransferService: Server created on 49972 
15/06/17 17:56:44 INFO BlockManagerMaster: Trying to register BlockManager 
15/06/17 17:56:44 INFO BlockManagerMasterEndpoint: Registering block manager localhost:49972 with 969.6 MB RAM, BlockManagerId(driver, localhost, 49972) 
15/06/17 17:56:44 INFO BlockManagerMaster: Registered BlockManager 
15/06/17 17:56:45 INFO ReceiverTracker: ReceiverTracker started 
15/06/17 17:56:45 INFO ForEachDStream: metadataCleanupDelay = -1 
15/06/17 17:56:45 INFO FlatMappedDStream: metadataCleanupDelay = -1 
15/06/17 17:56:45 INFO MappedDStream: metadataCleanupDelay = -1 
15/06/17 17:56:45 INFO TwitterInputDStream: metadataCleanupDelay = -1 
15/06/17 17:56:45 INFO TwitterInputDStream: Slide time = 1000 ms 
15/06/17 17:56:45 INFO TwitterInputDStream: Storage level = StorageLevel(false, false, false, false, 1) 
15/06/17 17:56:45 INFO TwitterInputDStream: Checkpoint interval = null 
15/06/17 17:56:45 INFO TwitterInputDStream: Remember duration = 1000 ms 
15/06/17 17:56:45 INFO TwitterInputDStream: Initialized and validated [email protected] 
15/06/17 17:56:45 INFO MappedDStream: Slide time = 1000 ms 
15/06/17 17:56:45 INFO MappedDStream: Storage level = StorageLevel(false, false, false, false, 1) 
15/06/17 17:56:45 INFO MappedDStream: Checkpoint interval = null 
15/06/17 17:56:45 INFO MappedDStream: Remember duration = 1000 ms 
15/06/17 17:56:45 INFO MappedDStream: Initialized and validated [email protected] 
15/06/17 17:56:45 INFO FlatMappedDStream: Slide time = 1000 ms 
15/06/17 17:56:45 INFO FlatMappedDStream: Storage level = StorageLevel(false, false, false, false, 1) 
15/06/17 17:56:45 INFO FlatMappedDStream: Checkpoint interval = null 
15/06/17 17:56:45 INFO FlatMappedDStream: Remember duration = 1000 ms 
15/06/17 17:56:45 INFO FlatMappedDStream: Initialized and validated [email protected] 
15/06/17 17:56:45 INFO ForEachDStream: Slide time = 1000 ms 
15/06/17 17:56:45 INFO ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1) 
15/06/17 17:56:45 INFO ForEachDStream: Checkpoint interval = null 
15/06/17 17:56:45 INFO ForEachDStream: Remember duration = 1000 ms 
15/06/17 17:56:45 INFO ForEachDStream: Initialized and validated [email protected] 
Exception in thread "main" java.lang.ExceptionInInitializerError 
    at scala.collection.parallel.ParIterableLike$class.$init$(ParIterableLike.scala:166) 
    at scala.collection.parallel.mutable.ParArray.<init>(ParArray.scala:58) 
    at scala.collection.parallel.mutable.ParArray$.wrapOrRebuild(ParArray.scala:702) 
    at scala.collection.parallel.mutable.ParArray$.handoff(ParArray.scala:700) 
    at scala.collection.mutable.ArrayBuffer.par(ArrayBuffer.scala:74) 
    at org.apache.spark.streaming.DStreamGraph.start(DStreamGraph.scala:49) 
    at org.apache.spark.streaming.scheduler.JobGenerator.startFirstTime(JobGenerator.scala:188) 
    at org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:94) 
    at org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:73) 
    at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:588) 
    at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586) 
    at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:610) 
    at com.pearson.illuminati.java.spark.JavaStreamingCount.main(JavaStreamingCount.java:64) 
Caused by: java.lang.NumberFormatException: Not a version: 
    at scala.util.PropertiesTrait$class.parts$1(Properties.scala:176) 
    at scala.util.PropertiesTrait$class.isJavaAtLeast(Properties.scala:180) 
    at scala.util.Properties$.isJavaAtLeast(Properties.scala:16) 
    at scala.collection.parallel.package$.getTaskSupport(package.scala:45) 
    at scala.collection.parallel.package$.<init>(package.scala:48) 
    at scala.collection.parallel.package$.<clinit>(package.scala) 
    ... 13 more 
15/06/17 17:56:45 INFO ReceiverTracker: Starting 1 receivers 
15/06/17 17:56:45 INFO SparkContext: Starting job: start at JavaStreamingCount.java:64 
15/06/17 17:56:45 INFO DAGScheduler: Got job 0 (start at JavaStreamingCount.java:64) with 1 output partitions (allowLocal=false) 
15/06/17 17:56:45 INFO DAGScheduler: Final stage: ResultStage 0(start at JavaStreamingCount.java:64) 
15/06/17 17:56:45 INFO DAGScheduler: Parents of final stage: List() 
15/06/17 17:56:45 INFO DAGScheduler: Missing parents: List() 
15/06/17 17:56:45 INFO DAGScheduler: Submitting ResultStage 0 (ParallelCollectionRDD[0] at start at JavaStreamingCount.java:64), which has no missing parents 
java.lang.NullPointerException 
    at org.xerial.snappy.OSInfo.translateOSNameToFolderName(OSInfo.java:140) 
    at org.xerial.snappy.OSInfo.getOSName(OSInfo.java:107) 
    at org.xerial.snappy.OSInfo.getNativeLibFolderPathForCurrentOS(OSInfo.java:103) 
    at org.xerial.snappy.SnappyLoader.findNativeLibrary(SnappyLoader.java:284) 
    at org.xerial.snappy.SnappyLoader.loadNativeLibrary(SnappyLoader.java:163) 
    at org.xerial.snappy.SnappyLoader.load(SnappyLoader.java:145) 
    at org.xerial.snappy.Snappy.<clinit>(Snappy.java:47) 
    at org.apache.spark.io.SnappyCompressionCodec.<init>(CompressionCodec.scala:150) 
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) 
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) 
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526) 
    at org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:68) 
    at org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:60) 
    at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$setConf(TorrentBroadcast.scala:73) 
    at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:80) 
    at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) 
    at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) 
    at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1289) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:874) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:815) 
    at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1419) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
15/06/17 17:56:45 INFO TaskSchedulerImpl: Cancelling stage 0 
15/06/17 17:56:45 INFO DAGScheduler: ResultStage 0 (start at JavaStreamingCount.java:64) failed in Unknown s 
15/06/17 17:56:45 INFO DAGScheduler: Job 0 failed: start at JavaStreamingCount.java:64, took 0.131996 s 
Exception in thread "Thread-27" org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.lang.reflect.InvocationTargetException 
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) 
java.lang.reflect.Constructor.newInstance(Constructor.java:526) 
org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:68) 
org.apache.spark.io.CompressionCodec$.createCodec(CompressionCodec.scala:60) 
org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$setConf(TorrentBroadcast.scala:73) 
org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:80) 
org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34) 
org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:62) 
org.apache.spark.SparkContext.broadcast(SparkContext.scala:1289) 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:874) 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:815) 
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:799) 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1419) 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411) 
org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 

    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:884) 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:815) 
    at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:799) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1419) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1411) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
15/06/17 17:56:45 INFO SparkContext: Invoking stop() from shutdown hook 
15/06/17 17:56:45 INFO SparkUI: Stopped Spark web UI at http://10.25.170.67:4040 
15/06/17 17:56:45 INFO DAGScheduler: Stopping DAGScheduler 
15/06/17 17:56:45 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 
15/06/17 17:56:45 INFO Utils: path = C:\Users\ravitej.somayajula\AppData\Local\Temp\spark-55bd21ca-e955-4509-8780-d0d1385d36e9\blockmgr-d3a2b9f4-19c8-4b11-a4bb-13c83dba4a92, already present as root for deletion. 
15/06/17 17:56:45 INFO MemoryStore: MemoryStore cleared 
15/06/17 17:56:45 INFO BlockManager: BlockManager stopped 
15/06/17 17:56:45 INFO BlockManagerMaster: BlockManagerMaster stopped 
15/06/17 17:56:45 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 
15/06/17 17:56:45 INFO SparkContext: Successfully stopped SparkContext 
15/06/17 17:56:45 INFO Utils: Shutdown hook called 
15/06/17 17:56:45 INFO Utils: Deleting directory C:\Users\ravitej.somayajula\AppData\Local\Temp\spark-55bd21ca-e955-4509-8780-d0d1385d36e9 
15/06/17 17:56:45 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 

ПОМ Мои проекта. xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 

    <modelVersion>4.0.0</modelVersion> 
    <groupId>spark-illuminati</groupId> 
    <artifactId>spark-basics</artifactId> 
    <version>0.0.1-SNAPSHOT</version> 

    <dependencies> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-core_2.10</artifactId> 
      <version>1.4.0</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-sql_2.10</artifactId> 
      <version>1.4.0</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-streaming-twitter_2.10</artifactId> 
      <version>1.4.0</version> 
     </dependency> 
     <dependency> 
      <groupId>org.apache.spark</groupId> 
      <artifactId>spark-streaming_2.10</artifactId> 
      <version>1.4.0</version> 
     </dependency> 
     <dependency> 
      <groupId>org.twitter4j</groupId> 
      <artifactId>twitter4j-core</artifactId> 
      <version>[4.0,)</version> 
     </dependency> 
    </dependencies> 

    <build> 
     <plugins> 
      <plugin> 
       <artifactId>maven-assembly-plugin</artifactId> 
       <configuration> 
        <descriptorRefs> 
         <descriptorRef>jar-with-dependencies</descriptorRef> 
        </descriptorRefs> 
        <archive> 
         <manifest> 
          <mainClass></mainClass> 
         </manifest> 
        </archive> 
       </configuration> 
       <executions> 
        <execution> 
         <id>make-assembly</id> 
         <phase>package</phase> 
         <goals> 
          <goal>single</goal> 
         </goals> 
        </execution> 
       </executions> 

      </plugin> 
      <plugin> 
       <groupId>org.apache.maven.plugins</groupId> 
       <artifactId>maven-compiler-plugin</artifactId> 
       <version>3.3</version> 
       <configuration> 
        <source>1.7</source> 
        <target>1.7</target> 
        <compilerVersion>1.7</compilerVersion> 
       </configuration> 
      </plugin> 
     </plugins> 
     <resources> 
      <resource> 
       <directory>src/main/resources</directory> 
       <excludes /> 
      </resource> 
     </resources> 
    </build> 
</project> 
+0

Похоже, что Scala не может определить версию Java ... Что такое версия 2 ? Может быть, они не совместимы? –

+0

Добавил мой pom.xml. Я не добавил scala в качестве зависимости, мне нужно? –

ответ

0

Там, кажется, версия конфликт между искровой, искровым щебетом и SCALA версией, которые я был в состоянии успешно решить