2016-12-16 13 views
3

У меня возникла проблема в эти дни, я пытаюсь читать из нескольких файлов с помощью обжига и создавать вывод с одним файлом. Мой код заключается в следующем:Прочитайте несколько файлов, используя обжиг и выведите SINGLE-файл

def getFilesSource (paths: Seq[String]) = { 
    new MultipleTextLineFiles(paths: _*) { 
     override protected def createHdfsReadTap(hdfsMode: Hdfs): Tap[JobConf, _, _] = { 
     val taps = goodHdfsPaths(hdfsMode).toList.map { 
      path => CastHfsTap (new Hfs (hdfsScheme, path, sinkMode)) 
     } 

     taps.size match { 
      case 0 => { 
      CastHfsTap (new Hfs(hdfsScheme, hdfsPaths.head, sinkMode)) 
      } 
      case 1 => taps.head 
      case _ => new ScaldingMultiSourceTap(taps) 
     } 
     } 
    } 
    } 

Но когда я запускаю этот код, он разбивает свой выход на большое количество файлов, но данные внутри очень мало: всего лишь несколько К. Вместо этого я хочу, чтобы иметь возможность объединить все выходные файлы в один.

Мой обжигающий код:

val source = getFilesSource(mapped) // where mapped is a Sequence of valid HDFS paths (Seq [String]) 

TypedPipe.from(source).map(a => Try{ 
    val json = JSON.parseObject(a) 
    (json.getInteger("prop1"), json.getInteger("prop2"), json.getBoolean("prop3")) 
}.toOption).filter(a => a.nonEmpty) 
    .map(a => a.get) 
    .filter(a => !a._3) 
    .map (that => MyScaldingType (that._1, that._2)) 
    .write(MyScaldingType.typedSink(typedArgs)) 

Я предполагаю, что я должен переопределить «sourceConfInit» метод типа ScaldingMultiSourceTap, но я не знаю, что написать внутри ...

ответ

0

Вы можете использовать groupAll для отправки всех выходов карты (задание - только задание карты) на один редуктор, учитывая, что данные малы, а затем выполните запись. Вывод будет записан в один файл.

. 
. 
. 
.filter(a => !a._3) 
.map (that => MyScaldingType (that._1, that._2)) 
.groupAll 
.write(MyScaldingType.typedSink(typedArgs)) 
+0

Hi @karthikcru, спасибо за ответ, звучит многообещающе. Я попробую сегодня утром. Надеюсь, что, поскольку я выполняю фильтр на фазе карты (используя оператор: .filter (a =>! A._3)), для моего бизнес-случая будет много данных, которые не пройдут эти критерии фильтра. оставшийся материал будет отправлен на единственный редуктор. –