2017-02-14 13 views
2

У меня есть требование, когда я хочу записывать каждую отдельную запись в RDD в отдельный файл в HDFS.Spark: записывать каждую запись в RDD в отдельные файлы в каталоге HDFS

Я сделал это для нормальной файловой системы, но, очевидно, это не работает для HDFS.

stream.foreachRDD{ rdd => 
    if(!rdd.isEmpty()) { 
     rdd.foreach{ 
      msg => 
      val value = msg._2 
      println(value) 
      val fname = java.util.UUID.randomUUID.toString 
      val path = dir + fname 
      write(path, value) 
     } 
     } 
    } 

где записи это функция, которая записывает в файловую систему.

Есть ли способ сделать это внутри искры, так что для каждой записи я могу изначально записать в HDFS без использования каких-либо других инструментов, таких как Kafka Connect или Flume ??


EDIT: Больше Объяснение

Для например: Если мой DstreamRDD имеет следующие записи,

  • ABCD
  • EFGH
  • IJKL
  • MNOP

Мне нужны разные файлы для каждой записи, поэтому для файла «abcd» используется другой файл, отличный от «efgh» и т. Д.

Я попытался создать RDD внутри streamRDD, но я узнал, что это запрещено, поскольку RDD не сериализуемы.

+0

Не могли бы вы предоставить рабочее решение или принять правильное решение. Это помогает другим людям, которые имеют аналогичную проблему. – Explorer

+0

@LiveAndLetLive Я еще не нашел решение этой проблемы, и, как я упоминал в одном из предыдущих комментариев, мы перешли от хранения записи к хранению всего RDD с несколькими записями. Итак, этот вопрос все еще открыт. –

+0

вы можете использовать свой собственный MultipleTextOutputFormat, см. Этот ответ: https://stackoverflow.com/a/26051042/609597 – softwarevamp

ответ

-1

Вы можете сделать в нескольких способах ..

От РДА, вы можете получить sparkCOntext после того, как вы получили sparkCOntext, вы можете использовать метод распараллеливания и передать строку в Списке String.

Например:

val sc = rdd.sparkContext 
sc.parallelize(Seq("some string")).saveAsTextFile(path) 

Кроме того, вы можете использовать sqlContext, чтобы преобразовать строку в DF затем записать в файл.

для примера:

import sqlContext.implicits._ 
Seq(("some string")).toDF 
+0

Мои данные находятся в пределах rdd, поэтому я не могу просто создать rdd так, как вы указали, поскольку вложенность rdd не является позволил. –

+0

подход shankars кажется мне прав. @BiplobBiswas, что еще вы пробовали, смогли решить? –

+0

@RamGhadiyaram Мы перешли на сохранение всего RDD в HDFS, хотя сохранение отдельных записей в виде отдельных файлов позволило бы решить наши будущие проблемы. –

0

Вы можете принудительно перераспределить ДРР на нет. разделов столько, сколько нет. записей и затем сэкономить

val rddCount = rdd.count() 
rdd.repartition(rddCount).saveAsTextFile("your/hdfs/loc")