2017-02-17 11 views
1

Я пишу программу spark/scala для чтения в ZIP-файлах, разархивировать их и записать содержимое в набор новых файлов. Я могу заставить это работать для записи в локальную файловую систему, но задавался вопросом, есть ли способ записать выходные файлы в распределенную файловую систему, такую ​​как HDFS. Код показан below`Запись на HDFS в Spark/Scala

import java.util.zip.ZipInputStream 
import org.apache.spark.input.PortableDataStream 
import java.io._ 

var i =1 
sc.binaryFiles("file:///d/tmp/zips/").flatMap((file: (String,  PortableDataStream)) => 
    { 


    val zipStream = new ZipInputStream(file._2.open)    
    val entry = zipStream.getNextEntry        
    val iter = scala.io.Source.fromInputStream(zipStream).getLines   

    val fname = f"/d/tmp/myfile$i.txt" 


    i = i + 1 

    val xx = iter.mkString 
    val writer = new PrintWriter(new File(fname)) 
    writer.write(xx) 
    writer.close() 

    iter              
    }).collect() 

`

ответ

3

Вы можете легко записывать данные в HDFS с помощью Hadoop-общих библиотек (если вы используете SBT в зависимости инструмента manangement, добавьте Тата библиотеку к вашей зависимости). При том, что вы можете создать FileSystem объект:

private val fs = { 
    val conf = new Configuration() 
    FileSystem.get(conf) 
    } 

не забудьте настроить FileSystem с информацией Hadoop кластера (ядро-site.xml и т.д.)

Тогда можно написать, например, Строка пути (в вашем случае, вы должны иметь дело с потоками), на HDFS следующим образом:

@throws[IOException] 
    def writeAsString(hdfsPath: String, content: String) { 
    val path: Path = new Path(hdfsPath) 
    if (fs.exists(path)) { 
     fs.delete(path, true) 
    } 
    val dataOutputStream: FSDataOutputStream = fs.create(path) 
    val bw: BufferedWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream, "UTF-8")) 
    bw.write(content) 
    bw.close 
    } 
0

вы должны взглянуть на метод saveAsTextFile из официальной документации: http://spark.apache.org/docs/latest/programming-guide.html

Это позволит вам сэкономить на HDFS:

iter.saveAsTextFile("hdfs://...") 
+0

В этом коде он не является RDD, поэтому не может его записать. Возможно, сначала конверсия. – dumitru

+0

Да, я думаю, что актеры будут хороши здесь. RDD должен быть типом данных, чтобы манипулировать искровым светом, чтобы получить распределенные данные по кластеру. – chateaur

+0

Это суть проблемы. Я пробовал все, что мог придумать, чтобы получить данные в моем ядре в RDD, чтобы включить использование saveasTextFile, но придумал короткий. Если кто-то решил это, пожалуйста, дайте мне знать – user2699504

0

Вы можете попробовать saveAsTextFile метод.

Укажите элементы набора данных в виде текстового файла (или набора текстовых файлов) в данном каталоге в локальной файловой системе, HDFS или любой другой поддерживаемой Hadoop файловой системой. Spark вызовет toString для каждого элемента, чтобы преобразовать его в строку текста в файле.

Он сохранит каждый раздел как другой файл. Количество разделов, в котором вы закончите, будет таким же, как количество ваших входных файлов, если вы не перераспределяете или не объединяетесь.

+0

. Пожалуйста, ознакомьтесь с моими комментариями относительно того, почему использование saveasTextFile является проблемой. – user2699504

+0

Невозможно Вы можете написать весь RDD не каждый файл отдельно. вместо сбора использовать saveAsText файл? – NetanelRabinowitz

+0

Это объединяет все распакованные данные для каждого из них в один файл. Я этого не хочу. Я хочу, чтобы каждый распакованный файл был в отдельном файле – user2699504

0
sc.binaryFiles("/user/example/zip_dir", 10)        //make an RDD from *.zip files in HDFS 
      .flatMap((file: (String, PortableDataStream)) => {     //flatmap to unzip each file 
       val zipStream = new ZipInputStream(file._2.open)    //open a java.util.zip.ZipInputStream 
       val entry = zipStream.getNextEntry        //get the first entry in the stream 
       val iter = Source.fromInputStream(zipStream).getLines   //place entry lines into an iterator 
       iter.next              //pop off the iterator's first line 
       iter               //return the iterator 
      }) 
      .saveAsTextFile("/user/example/quoteTable_csv/result.csv")