2017-02-07 5 views
0

Мое требование состоит в том, чтобы разделить большой файл (содержащий миллионы записей) на 100 тыс. Файлов записей и распространить их на 3 папки. Я использовал Java для разделения файлов и весной интеграции для их распространения. Во время процесса требуется много времени, чтобы распространять файлы в папки. Добавьте файлы конфигурации ниже. Является ли использование весеннего интеграционного моста правильным для этой цели или существует какой-либо оптимизированный способ? Мне нужно улучшить этот код для чтения и записи файлов в ведра s3 вместо локальных каталогов в будущем. Будут ли поддерживаемые s3 входящие/исходящие адаптеры поддерживать одну и ту же цель?Разделение и распределение файлов с использованием интеграции с весной

<?xml version="1.0" encoding="UTF-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
xmlns:integration="http://www.springframework.org/schema/integration" 
xmlns:file="http://www.springframework.org/schema/integration/file" 
xsi:schemaLocation="http://www.springframework.org/schema/beans 
     http://www.springframework.org/schema/beans/spring-beans.xsd 
     http://www.springframework.org/schema/integration 
     http://www.springframework.org/schema/integration/spring-integration.xsd 
     http://www.springframework.org/schema/integration/file 
     http://www.springframework.org/schema/integration/file/spring-integration-file.xsd"> 


<file:inbound-channel-adapter id="filesIn1" 
    directory="/tmp/test"> 
    <integration:poller id="poller" fixed-delay="5000" /> 
</file:inbound-channel-adapter> 

<file:outbound-channel-adapter id="filesOut1" 
    directory="/tmp/output" delete-source-files="true" /> 

<integration:service-activator 
    input-channel="filesIn1" output-channel="filesOut1" ref="handler" /> 


<bean id="handler" class="com.test.Handler" /> 


<file:inbound-channel-adapter id="filesIn2" 
    directory="/tmp/output" filename-pattern="stack1*"> 
    <integration:poller id="poller1" fixed-delay="5000" /> 
</file:inbound-channel-adapter> 
<file:outbound-channel-adapter id="filesOut2" 
    directory="/tmp/stack1" delete-source-files="true" /> 
<integration:bridge input-channel="filesIn2" 
     output-channel="filesOut2" /> 


<file:inbound-channel-adapter id="filesIn3" 
    directory="/tmp/output" filename-pattern="stack2*"> 
    <integration:poller id="poller2" fixed-delay="5000"/> 
</file:inbound-channel-adapter> 
<file:outbound-channel-adapter id="filesOut3" 
    directory="/tmp/stack2" delete-source-files="true" /> 
<integration:bridge input-channel="filesIn3" 
     output-channel="filesOut3" /> 


     <file:inbound-channel-adapter id="filesIn4" 
    directory="/tmp/output" filename-pattern="stack3*"> 
    <integration:poller id="poller3" fixed-delay="5000" /> 
</file:inbound-channel-adapter> 
<file:outbound-channel-adapter id="filesOut4" 
    directory="/tmp/stack3" delete-source-files="true" /> 
<integration:bridge input-channel="filesIn4" 
     output-channel="filesOut4" /> 

Handler.java

import java.io.BufferedReader; 
import java.io.BufferedWriter; 
import java.io.DataInputStream; 
import java.io.File; 
import java.io.FileInputStream; 
import java.io.FileWriter; 
import java.io.InputStreamReader; 
import java.nio.file.Files; 
import java.nio.file.Path; 
import java.nio.file.Paths; 
import java.util.List; 

    public class Handler { 
    public void handleFile(File input) { 
     System.out.println("Copying file: " + input.getAbsolutePath()); 

     try { 

      Path p = Paths.get(input.getAbsolutePath()); 
      List<String> lines = Files.readAllLines(p); 

      int count = lines.size(); 
      System.out.println("Lines in the file: " + count); 
      lines.size();// Source File Name. 
      Long nol = 100000L; // No. of lines to be split and saved in each 
      int stackcount = 0; 

      Long temp = (count/nol); 
      Long temp1 = (Long) temp; 

      Long nof = 0L; 
      if (temp1 == temp) { 
       nof = temp1; 
      } 
      else { 
       nof = temp1 + 1; 
      } 
      System.out.println("No. of files to be generated :" + nof); // Displays 
                     // no. 
                     // of 
                     // files 
                     // to be 
                     // generated. 

      // --------------------------------------------------------------------------------------------------------- 

      // Actual splitting of file into smaller files 

      FileInputStream fstream = new FileInputStream(input.getAbsolutePath()); 
      DataInputStream in = new DataInputStream(fstream); 

      BufferedReader br = new BufferedReader(new InputStreamReader(in)); 
      String strLine; 

      for (int j = 1; j <= nof; j++) { 

       if (stackcount < 3) { 
        stackcount = stackcount + 1; 
       } 
       else { 
        stackcount = 1; 
       } 

       FileWriter fstream1 = new FileWriter("/tmp/output/stack" + stackcount + "-" + j + ".dat"); // Destination 
                              // File 
                              // Location 
       BufferedWriter out = new BufferedWriter(fstream1); 

       for (int i = 1; i <= nol; i++) { 
        strLine = br.readLine(); 
        if (strLine != null) { 
         out.write(strLine); 
         if (i != nol) { 
          out.newLine(); 
         } 
        } 
       } 
       out.close(); 
      } 
      in.close(); 

     } 
     catch (Exception e) { 
      System.err.println("Error: " + e.getMessage()); 
     } 

    } 

} 
+0

Прежде всего, с помощью 'Files.readAllLines (р)' для подсчета количества строк в файле (как вы сказали - миллион записей) ужасно. – Andremoniy

ответ

0

Является ли использование пружинного интегрального моста правильным для этой цели или есть какой-либо оптимизированный способ?

Вы можете пойти этим путем. Все нормально. Но если перейти к атрибуту channel на входящих и исходящих адаптеров каналов вам не нужно, что <bridge>

Будут s3 входящие/исходящие адаптеры поддерживают ту же цель?

Исправить. Они действительно обеспечивают аналогичную функциональность, но для протокола AWS S3.

Примечание: ваш конфиг не ясно, и похоже, что это не связано с вопросом ...

0

Как сказал Andremoniy в комментарии на вопрос, используя Files.readAllLines не то, что вы на самом деле хотите, - по-видимому, ваша память не может удержать столько в один раз.

Так вместо этого, почему бы не попробовать это:

try(Stream<String> allLines = Files.lines(path)) { 
    Iterator<String> reader = allLines.iterator(); 

    int splitBatch = 10000; // however much you need. 
    int lineCount = 0; 
    int batchNumber = 1; 
    FileWriter out = getBatchOut(batchNumber); // replace this with what you need. 
    while(reader.hasNext() && lineCount < splitBatch) { 
    if (lineCount == splitBatch) { 
     out.flush(); out.close(); 
     out = getBatchOut(++batchNumber); // next batch 
     lineCount = 0; 
    } 
    out.write(reader.next()); 
    lineCount++; 
    } 
} 

Примечание я не включал обработку исключений в моем примере кода. Вы всегда должны помнить о том, чтобы освобождать все ресурсы при возникновении исключения. В моем примере, out писатель всегда должен быть закрыт, чтобы не вводить утечки памяти. Я оставлю это вам, как это сделать правильно и когда.