2016-11-09 8 views
0

здесь Я передаю данные из потокового каталога и записываю его в выходное местоположение. Я также пытаюсь реализовать процесс перемещения файлов hdfs из папки ввода в потоковый каталог. Этот шаг происходит один раз до начала потокового контекста. Но я хочу, чтобы этот шаг выполнялся каждый раз для каждой партии Dstream. что это возможно?как перемещать файлы внутри моего искрового потокового приложения

val streamed_rdd = ssc.fileStream[LongWritable, Text, TextInputFormat](streaming_directory, (t:Path)=> true , true).map { case (x, y) => (y.toString) } 
    streamed_rdd.foreachRDD(rdd => { 
     rdd.map(x =>x.split("\t")).map(x => x(3)).foreachPartition { partitionOfRecords => 
     val connection: Connection = connectionFactory.createConnection() 
     connection.setClientID("Email_send_module_client_id") 
     println("connection started with active mq") 
     val session: Session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE) 
     println("created session") 
     val dest = session.createQueue("dwEmailsQueue2") 
     println("destination queue name = dwEmailsQueue2") 
     val prod_queue = session.createProducer(dest) 
     connection.start() 
     partitionOfRecords.foreach { record => 
      val rec_to_send: TextMessage = session.createTextMessage(record) 
      println("started creating a text message") 
      prod_queue.send(rec_to_send) 
      println("sent the record") 
     } 
     connection.close() 
     } 
    } 
    ) 
    **val LIST = scala.collection.mutable.MutableList[String]() 
    val files_to_move = scala.collection.mutable.MutableList[String]() 
    val cmd = "hdfs dfs -ls -d "+load_directory+"/*" 
    println(cmd) 
    val system_time = System.currentTimeMillis 
    println(system_time) 
    val output = cmd.!! 
    output.split("\n").foreach(x => x.split(" ").foreach(x => if (x.startsWith("/user/hdpprod/")) LIST += x)) 
    LIST.foreach(x => if (x.toString.split("/").last.split("_").last.toLong < system_time) files_to_move += x) 
    println("files to move" +files_to_move) 
    var mv_cmd :String = "hdfs dfs -mv " 
    for (file <- files_to_move){ 
     mv_cmd += file+" " 
    } 
    mv_cmd += streaming_directory 
    println(mv_cmd) 
    val mv_output = mv_cmd.!! 
    println("moved the data to the folder")** 
    if (streamed_rdd.count().toString == "0") { 
     println("no data in the streamed list") 
    } else { 
     println("saving the Dstream at "+System.currentTimeMillis()) 
     streamed_rdd.transform(rdd => {rdd.map(x => (check_time_to_send+"\t"+check_time_to_send_utc+"\t"+x))}).saveAsTextFiles("/user/hdpprod/temp/spark_streaming_output_sent/sent") 
    } 
    ssc.start() 
    ssc.awaitTermination() 
    } 
} 

ответ

0

Я попытался сделать то же самое в реализации Java, как показано ниже. вы можете вызвать этот метод из foreachPartion on rdd

public static void moveFiles(final String moveFilePath, 
      final JavaRDD rdd) { 
      for (final Partition partition : rdd.partitions()) { 
       final UnionPartition unionPartition = (UnionPartition) partition; 
       final NewHadoopPartition newHadoopPartition = (NewHadoopPartition) 
        unionPartition.parentPartition(); 
       final String fPath = newHadoopPartition.serializableHadoopSplit() 
        .value().toString(); 
       final String[] filespaths = fPath.split(":"); 

       if ((filespaths != null) && (filespaths.length > 0)) { 
        for (final String filepath : filespaths) { 
         if ((filepath != null) && filepath.contains("/")) { 
          final File file = new File(filepath); 

          if (file.exists() && file.isFile()) { 
           try { 
            File destFile = new File(moveFilePath + "/" + 
              file.getName()); 

            if (destFile.exists()) { 
             destFile = new File(moveFilePath + "/" + 
               file.getName() + "_"); 
            } 

            java.nio.file.Files.move((file 
              .toPath()), destFile.toPath(), 
             StandardCopyOption.REPLACE_EXISTING); 


           } catch (Exception e) { 
            logger.error(
             "Exception while moving file", 
             e); 
           } 
          } 
         } 
        } 
       } 
      } 

     }