2016-12-28 9 views
0

У меня есть RDD, что имеет сигнатуруSpark: Как написать org.apache.spark.rdd.RDD [java.io.ByteArrayOutputStream]

org.apache.spark.rdd.RDD[java.io.ByteArrayOutputStream] 

В этом РДУ, каждая строка имеет свой собственный раздел.

Этот ByteArrayOutputStream представляет собой zip-выход. Я применяю некоторую обработку данных в каждом разделе, и я хочу экспортировать обработанные данные из каждого раздела в виде одного zip-файла. Каков наилучший способ экспортировать каждую строку в конечном RDD как один файл в строке на hdf?

Если вам интересно узнать, как я попал в такой Rdd.

val npyData = transformedTopData.select("tokenIDF", "topLevelId").rdd.repartition(2).mapPartitions(x => { 
     val vectors = for { 
     row <- x 
     } yield { 
     row.getAs[Vector](0) 
     } 
     Seq(ml2npyCSR(vectors.toSeq).zipOut) 
    }.iterator) 

EDIT: Граф отлично работает

scala> npyData.count() 
res9: Long = 2 
+1

вы пробовали 'count' его? Я сомневаюсь, что RDD будет работать. –

+0

@ JacekLaskowski Count отлично работает. обновленный результат, о котором идет речь – vumaasha

+0

Что такое 'ml2npyCSR.zipOut'? –

ответ

0

Я понял, что я должен представить мои данные в PairRDD и реализовать пользовательские FileOutputFormat. Я посмотрел на реализацию SequenceFileOutputFormat для вдохновения и смог написать свою собственную версию на основе этого.

Мой заказ FileOutputFormat доступен here

0

Свеча имеет очень слабую поддержку операций файловой системы. Вам нужно Hadoop FileSystem API для создания отдельных файлов

// This method is needed as Hadoop conf object is not serializable 
def createFileStream(pathStr:String) = { 
    import org.apache.hadoop.conf.Configuration; 
    import org.apache.hadoop.fs.FileSystem; 
    import org.apache.hadoop.fs.Path; 

    val hadoopconf = new Configuration(); 
    val fs = FileSystem.get(hadoopconf); 
    val outFileStream = fs.create(new Path(pathStr)); 
    outFileStream 
} 

// Method writes to individual files. 
// Needs a unique id along with object for output file naming 
def writeToFile(x:(Char, Long)) : Unit = { 
    val (dataStream, id) = x 
    val output_dir = "/tmp/del_a/" 
    val outFileStream = createFileStream(output_dir+id) 
    dataStream.writeTo(outFileStream) 
    outFileStream.close() 
} 


// zipWithIndex used for creating unique id for each item in rdd 
npyData.zipWithIndex().foreach(writeToFile) 

Ссылка:
Hadoop FileSystem example
ByteArrayOutputStream.writeTo(java.io.OutputStream)

+0

Хотя это сработает, я нашел правильный способ сделать это, о чем я объясню в своем ответе ниже – vumaasha

 Смежные вопросы

  • Нет связанных вопросов^_^