2016-12-13 6 views
0

Я пытаюсь прочитать файл, используя программу Scala SparkStreaming. Файл хранится в каталоге на моем локальном компьютере и пытается записать его как новый файл на моем локальном компьютере. Но всякий раз, когда я пишу свой поток и храню его как паркет, я получаю пустые папки.Как читать файл с использованием искрообразования и писать в простой файл с помощью Scala?

Это мой код:

Logger.getLogger("org").setLevel(Level.ERROR) 
val spark = SparkSession 
      .builder() 
      .master("local[*]") 
      .appName("StreamAFile") 
      .config("spark.sql.warehouse.dir", "file:///C:/temp") 
      .getOrCreate() 


import spark.implicits._    
val schemaforfile = new StructType().add("SrNo",IntegerType).add("Name",StringType).add("Age",IntegerType).add("Friends",IntegerType) 

val file = spark.readStream.schema(schemaforfile).csv("C:\\SparkScala\\fakefriends.csv") 

file.writeStream.format("parquet").start("C:\\Users\\roswal01\\Desktop\\streamed") 

spark.stop() 

Есть ли что-нибудь не хватает с моим кодом или что-нибудь, который, в котором я пошло не так?

Я также пробовал читать этот файл из местоположения hdfs, но тот же код заканчивается тем, что не создает никаких выходных папок на моих hdf-страницах.

ответ

3

вы здесь ошибка:

val file = spark.readStream.schema(schemaforfile).csv("C:\\SparkScala\\fakefriends.csv") 

CSV() функция должна иметь путь к каталогу в качестве аргумента. Она будет сканировать этот каталог и читать все новые файлы, если они будут перемещены в этот каталог

Для контрольных точек, вы должны добавить

.option("checkpointLocation", "path/to/HDFS/dir") 

Например:

val query = file.writeStream.format("parquet") 
    .option("checkpointLocation", "path/to/HDFS/dir") 
    .start("C:\\Users\\roswal01\\Desktop\\streamed") 

query.awaitTermination() 
+0

'file.writeStream.parquet («C: \\ Users \\ roswal01 \\ Desktop \\ streamed»). Start() 'здесь паркет, кажется, дает ошибку, он показывает, что он не является членом DataStreamWriter. Я пропустил импорт любых пакетов? –

+0

Он также дает ошибку контрольной точки. 'checkpointLocation должен указываться либо через опцию (« checkpointLocation », ...), либо SparkSession.conf.set (« spark.sql.streaming.checkpointLocation », ...);' –

+0

@ Документация RohanOswal кажется устаревшей - Прости! Я удалил функцию паркета() из ответа. Я также добавил информацию, как сделать контрольную точку, она была - и, вероятно, по-прежнему - не всегда требуется, и именно поэтому я не включил ее. –