здесь Я передаю данные из потокового каталога и записываю его в выходное местоположение. Я также пытаюсь реализовать процесс перемещения файлов hdfs из папки ввода в потоковый каталог. Этот шаг происходит один раз до начала потокового контекста. Но я хочу, чтобы этот шаг выполнялся каждый раз для каждой партии Dstream. что это возможно?как перемещать файлы внутри моего искрового потокового приложения
val streamed_rdd = ssc.fileStream[LongWritable, Text, TextInputFormat](streaming_directory, (t:Path)=> true , true).map { case (x, y) => (y.toString) }
streamed_rdd.foreachRDD(rdd => {
rdd.map(x =>x.split("\t")).map(x => x(3)).foreachPartition { partitionOfRecords =>
val connection: Connection = connectionFactory.createConnection()
connection.setClientID("Email_send_module_client_id")
println("connection started with active mq")
val session: Session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)
println("created session")
val dest = session.createQueue("dwEmailsQueue2")
println("destination queue name = dwEmailsQueue2")
val prod_queue = session.createProducer(dest)
connection.start()
partitionOfRecords.foreach { record =>
val rec_to_send: TextMessage = session.createTextMessage(record)
println("started creating a text message")
prod_queue.send(rec_to_send)
println("sent the record")
}
connection.close()
}
}
)
**val LIST = scala.collection.mutable.MutableList[String]()
val files_to_move = scala.collection.mutable.MutableList[String]()
val cmd = "hdfs dfs -ls -d "+load_directory+"/*"
println(cmd)
val system_time = System.currentTimeMillis
println(system_time)
val output = cmd.!!
output.split("\n").foreach(x => x.split(" ").foreach(x => if (x.startsWith("/user/hdpprod/")) LIST += x))
LIST.foreach(x => if (x.toString.split("/").last.split("_").last.toLong < system_time) files_to_move += x)
println("files to move" +files_to_move)
var mv_cmd :String = "hdfs dfs -mv "
for (file <- files_to_move){
mv_cmd += file+" "
}
mv_cmd += streaming_directory
println(mv_cmd)
val mv_output = mv_cmd.!!
println("moved the data to the folder")**
if (streamed_rdd.count().toString == "0") {
println("no data in the streamed list")
} else {
println("saving the Dstream at "+System.currentTimeMillis())
streamed_rdd.transform(rdd => {rdd.map(x => (check_time_to_send+"\t"+check_time_to_send_utc+"\t"+x))}).saveAsTextFiles("/user/hdpprod/temp/spark_streaming_output_sent/sent")
}
ssc.start()
ssc.awaitTermination()
}
}