2017-01-11 10 views
0

Я использую функцию readCsvFile (path) в Apache-Flink api, чтобы прочитать CSV-файл и сохранить его в переменной списка. Как это работает, используя несколько потоков? Например, он разбивает файл на основе некоторой статистики? если да, то какая статистика? Или он читает файл по строкам, а затем отправляет строки в потоки для их обработки? Вот пример кода:Как API Apache-Flink читает CSV-файл, используя параллелизм под капотом?

//default parallelism is 4 
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 
csvPath="data/weather.csv"; 
List<Tuple2<String, Double>> csv= env.readCsvFile(csvPath) 
         .types(String.class,Double.class) 
         .collect(); 

полагая, что у нас есть CSV файл 800MB на локальном диске, как это распределить работу между этими 4 потоками?

ответ

1

Метод API readCsvFile() внутри создает источник данных с CsvInputFormat, который основан на FileInputFormat от Flink. Этот InputFormat генерирует список так называемых InputSplits. InputSplit определяет, какой диапазон файла должен быть отсканирован. Затем расщепления распределяются по задачам источника данных.

Итак, каждая параллельная задача сканирует определенную область файла и анализирует его содержимое. Это очень похоже на то, как это делается MapReduce/Hadoop.

+0

Спасибо, Фабиан. Но я хочу знать, как он определяет Splits? По размеру файла? Количество строк или что-то еще? И сначала он читает весь файл, а затем решает, или перед чтением идет разделение? – Ehsan

+0

Для 'CsvInputFormat' файл разделяется по размеру. Чтение файла в одном потоке для его разделения было бы бессмысленным. Поскольку строка может охватывать два разделения, потоки чтения начинаются с первой новой строки, которую он находит, и завершает линию, которая была запущена в ее расщеплении, даже если она пересекает границу разделения. –

+0

Хорошо. Допустим, у нас есть файл 200mb, а параллелизм установлен на 2. Thread1 должен начинаться с самого начала. Thread2 должен начинать чтение примерно с середины файла. Как thread2 узнает это местоположение? Как Thread1 знает, что он достиг конца своей части и должен остановиться? – Ehsan

1

Это то же самое, как How does Hadoop process records split across block boundaries?

извлечь код из FLiNK-релиз-1.1.3 DelimitedInputFormat файл.

// else .. 
    int toRead; 
    if (this.splitLength > 0) { 
     // if we have more data, read that 
     toRead = this.splitLength > this.readBuffer.length ? this.readBuffer.length : (int) this.splitLength; 
    } 
    else { 
     // if we have exhausted our split, we need to complete the current record, or read one 
     // more across the next split. 
     // the reason is that the next split will skip over the beginning until it finds the first 
     // delimiter, discarding it as an incomplete chunk of data that belongs to the last record in the 
     // previous split. 
     toRead = this.readBuffer.length; 
     this.overLimit = true; 
    } 

Совершенно ясно, что если он не читает строки разделителя в одном расколе, он получит еще один раскол найти. (Я не найти соответствующий код, я попробую.)

Plus : ниже показано, как я нахожу код, от readCsvFile() до DelimitedInputFormat.

enter image description here