2017-02-01 5 views
0

Запуск Спарк 1.6.2 (режим ПРЯЖА)Спарк Streaming textFileStream смотреть вывод RDD.saveAsTextFile

Во-первых, у меня есть некоторый код от этой должности до get filenames within Spark Streaming, так что может быть проблемой, но, надеюсь, нет.

В принципе, у меня есть эта первая работа.

import org.apache.spark.SparkContext 
import org.apache.spark.streaming.{StreamingContext, Seconds} 
import org.apache.hadoop.io.{LongWritable, Text} 
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat 

def getStream(ssc: StreamingContext, dir: String): DStream[String] = { 
    ssc.fileStream[LongWritable, Text, TextInputFormat](dir) 
} 

val sc = SparkContext.getOrCreate 
val ssc = new StreamingContext(sc, Seconds(5)) 

val inputDir = "hdfs:///tmp/input" 
val outputDir = "hdfs:///tmp/output1" 

val stream1 = getStream(ssc, inputDir) 
stream1.foreachRDD(rdd => rdd.saveAsTextFile(outputDir)) 

ssc.start() 
ssc.awaitTermination() 

И у меня также есть вторая работа, что для этого примера выглядит практически идентично, просто изменить вокруг inputDir и outputDir, и перейти на новый outputDir = "hdfs:///tmp/output2".

В любом случае, поэтому я должен начать вторую работу по потоку до первой работы, потому что ей необходимо следить за новыми файлами. Имеет смысл ...

Затем я начинаю первую работу и hadoop fs -copyFromLocal некоторые файлы в папку ввода, так как в соответствии с API

Файлы должны быть записаны в контролируемой директории «подтягивает» их из другого места в пределах одной файловой системы. Имена файлов, начинающиеся с. игнорируются.

Когда я пытаюсь запустить это, в конце концов падает с StackTrace, который содержит этот

17/02/01 11:48:35 INFO FileInputDStream: Finding new files took 7 ms 
17/02/01 11:48:35 INFO FileInputDStream: New files at time 1485949715000 ms: 
hdfs://sandbox.hortonworks.com:8020/tmp/output1/_SUCCESS 
17/02/01 11:48:35 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 355.9 KB, free 356.8 KB) 
17/02/01 11:48:35 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 28.9 KB, free 385.7 KB) 
17/02/01 11:48:35 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:43097 (size: 28.9 KB, free: 511.1 MB) 
17/02/01 11:48:35 INFO SparkContext: Created broadcast 1 from fileStream at FileStreamTransformer.scala:45 
17/02/01 11:48:35 ERROR JobScheduler: Error generating jobs for time 1485949715000 ms 
org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://sandbox.hortonworks.com:8020/output1/_SUCCESS 
    at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:323) 
    at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:265) 
    at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:387) 
    at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:120) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:242) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:240) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:240) 
    at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$4.apply(FileInputDStream.scala:276) 
    at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$4.apply(FileInputDStream.scala:266) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
    at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
    at org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD(FileInputDStream.scala:266) 
    at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:153) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351) 
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) 
    at scala.Option.orElse(Option.scala:257) 
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341) 
    at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42) 
    at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.immutable.List.foreach(List.scala:318) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
    at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
    at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351) 
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) 
    at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) 
    at scala.Option.orElse(Option.scala:257) 
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341) 
    at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42) 
    at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.immutable.List.foreach(List.scala:318) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
    at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
    at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351) 
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) 
    at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) 
    at scala.Option.orElse(Option.scala:257) 
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341) 
    at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351) 
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) 
    at scala.Option.orElse(Option.scala:257) 
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341) 
    at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351) 
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) 
    at scala.Option.orElse(Option.scala:257) 
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341) 
    at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47) 
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115) 
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:114) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) 
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) 
    at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:253) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:251) 
    at scala.util.Try$.apply(Try.scala:161) 
    at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:251) 
    at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
Exception in thread "main" org.apache.hadoop.mapreduce.lib.input.InvalidInputException: Input path does not exist: hdfs://sandbox.hortonworks.com:8020/tmp/output1/_SUCCESS 
    at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:323) 
    at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.listStatus(FileInputFormat.java:265) 
    at org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits(FileInputFormat.java:387) 
    at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:120) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:242) 
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:240) 
    at scala.Option.getOrElse(Option.scala:120) 
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:240) 
    at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$4.apply(FileInputDStream.scala:276) 
    at org.apache.spark.streaming.dstream.FileInputDStream$$anonfun$4.apply(FileInputDStream.scala:266) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
    at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
    at org.apache.spark.streaming.dstream.FileInputDStream.org$apache$spark$streaming$dstream$FileInputDStream$$filesToRDD(FileInputDStream.scala:266) 
    at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:153) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351) 
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) 
    at scala.Option.orElse(Option.scala:257) 
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341) 
    at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42) 
    at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.immutable.List.foreach(List.scala:318) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
    at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
    at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351) 
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) 
    at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) 
    at scala.Option.orElse(Option.scala:257) 
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341) 
    at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42) 
    at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) 
    at scala.collection.immutable.List.foreach(List.scala:318) 
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) 
    at scala.collection.AbstractTraversable.map(Traversable.scala:105) 
    at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351) 
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) 
    at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) 
    at scala.Option.orElse(Option.scala:257) 
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341) 
    at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351) 
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) 
    at scala.Option.orElse(Option.scala:257) 
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341) 
    at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351) 
    at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346) 
    at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344) 
    at scala.Option.orElse(Option.scala:257) 
    at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341) 
    at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47) 
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115) 
    at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:114) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251) 
    at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105) 
    at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:253) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:251) 
    at scala.util.Try$.apply(Try.scala:161) 
    at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:251) 
    at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:182) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:88) 
    at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87) 
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) 
17/02/01 11:48:35 INFO StreamingContext: Invoking stop(stopGracefully=false) from shutdown hook 
17/02/01 11:48:35 INFO JobGenerator: Stopping JobGenerator immediately 
17/02/01 11:48:35 INFO RecurringTimer: Stopped timer for JobGenerator after time 1485949715000 
17/02/01 11:48:35 INFO JobGenerator: Stopped JobGenerator 
17/02/01 11:48:35 INFO JobScheduler: Stopped JobScheduler 
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/streaming,null} 
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/streaming/batch,null} 
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/static/streaming,null} 
17/02/01 11:48:35 INFO StreamingContext: StreamingContext stopped successfully 
17/02/01 11:48:35 INFO SparkContext: Invoking stop() from shutdown hook 
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/streaming/batch/json,null} 
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/streaming/json,null} 
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/metrics/json,null} 
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/kill,null} 
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/api,null} 
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/,null} 
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/static,null} 
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump/json,null} 
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump,null} 
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/json,null} 
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors,null} 
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment/json,null} 
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment,null} 
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd/json,null} 
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd,null} 
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/json,null} 
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage,null} 
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool/json,null} 
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool,null} 
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/json,null} 
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage,null} 
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/json,null} 
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages,null} 
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job/json,null} 
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job,null} 
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/json,null} 
17/02/01 11:48:35 INFO ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs,null} 
17/02/01 11:48:35 INFO SparkUI: Stopped Spark web UI at http://172.17.0.2:4040 
17/02/01 11:48:35 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 
17/02/01 11:48:35 INFO MemoryStore: MemoryStore cleared 
17/02/01 11:48:35 INFO BlockManager: BlockManager stopped 
17/02/01 11:48:35 INFO BlockManagerMaster: BlockManagerMaster stopped 
17/02/01 11:48:35 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 
17/02/01 11:48:35 INFO SparkContext: Successfully stopped SparkContext 
17/02/01 11:48:35 INFO ShutdownHookManager: Shutdown hook called 
17/02/01 11:48:35 INFO ShutdownHookManager: Deleting directory /tmp/spark-85bb28ad-e3e1-4b2a-8795-04ac1c6a0ea5 
17/02/01 11:48:35 INFO ShutdownHookManager: Deleting directory /tmp/spark-85bb28ad-e3e1-4b2a-8795-04ac1c6a0ea5/httpd-65e6e9f0-dcb8-4b66-86f6-f775e2e497c0 
17/02/01 11:48:35 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 
17/02/01 11:48:35 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 
17/02/01 11:48:35 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down. 

И я знаю, что _SUCCESS файл записывается с помощью rdd.saveAsTextFile, так что это не проблема, но мои вопросы являются следующими:

  1. Файл делает есть. Может видеть это с помощью hadoop fs -ls
  2. Даже если файл не существует, API предназначен для получения новых файлов. Почему это читается?
  3. Этот файл пуст, так зачем его обрабатывать в любом случае?
  4. Возможно ли это? Может ли Spark Streaming следить за выходом другой работы Spark?

ответ

1

явно применять только новые файлы обрабатываются и обеспечить сенсорные файлы Лик _SUCCESS пропускаются мы можем использовать ниже подписи fileStream

def getStream(ssc: StreamingContext, dir: String): DStream[String] = { 
    ssc.fileStream[LongWritable, Text, TextInputFormat](dir, 
     (path: org.apache.hadoop.fs.Path) => (!path.getName.startsWith("_")) || (!path.getName().startsWith(".")), 
     newFilesOnly = true) 
} 

The newFileOnly по умолчанию в верно, когда не указан, как показано here. Поэтому в идеале _SUCCESS не должен был обрабатываться и в вашей настройке.

+0

Я просто заметил этот параметр при чтении методов API. Мне еще предстоит попробовать. Боковой вопрос: есть ли побочный эффект 'newFilesOnly = false', кроме того, что он собирает существующие файлы? Я устал от перемещения файлов в HDFS и из него, проверяя это. –

+1

Кроме того, я думаю, что хочу продолжать пропуски dot-файлов, так что, возможно, что-то вроде 'getName.matches ("^[^ _.] ")'? –

+0

@ cricket_007 да .. Я обновлял это в своем ответе, но не использовал регулярное выражение ... :), если вы используете регулярное выражение, вам нужно будет правильно избежать оператора точки. –