Я пытаюсь прочитать файл, используя программу Scala SparkStreaming. Файл хранится в каталоге на моем локальном компьютере и пытается записать его как новый файл на моем локальном компьютере. Но всякий раз, когда я пишу свой поток и храню его как паркет, я получаю пустые папки.Как читать файл с использованием искрообразования и писать в простой файл с помощью Scala?
Это мой код:
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession
.builder()
.master("local[*]")
.appName("StreamAFile")
.config("spark.sql.warehouse.dir", "file:///C:/temp")
.getOrCreate()
import spark.implicits._
val schemaforfile = new StructType().add("SrNo",IntegerType).add("Name",StringType).add("Age",IntegerType).add("Friends",IntegerType)
val file = spark.readStream.schema(schemaforfile).csv("C:\\SparkScala\\fakefriends.csv")
file.writeStream.format("parquet").start("C:\\Users\\roswal01\\Desktop\\streamed")
spark.stop()
Есть ли что-нибудь не хватает с моим кодом или что-нибудь, который, в котором я пошло не так?
Я также пробовал читать этот файл из местоположения hdfs, но тот же код заканчивается тем, что не создает никаких выходных папок на моих hdf-страницах.
'file.writeStream.parquet («C: \\ Users \\ roswal01 \\ Desktop \\ streamed»). Start() 'здесь паркет, кажется, дает ошибку, он показывает, что он не является членом DataStreamWriter. Я пропустил импорт любых пакетов? –
Он также дает ошибку контрольной точки. 'checkpointLocation должен указываться либо через опцию (« checkpointLocation », ...), либо SparkSession.conf.set (« spark.sql.streaming.checkpointLocation », ...);' –
@ Документация RohanOswal кажется устаревшей - Прости! Я удалил функцию паркета() из ответа. Я также добавил информацию, как сделать контрольную точку, она была - и, вероятно, по-прежнему - не всегда требуется, и именно поэтому я не включил ее. –