2017-01-04 13 views
1

Как обрабатывать пустые разделы в mapPartitionsWithIndex?spark mapPartitionsWithIndex обработка пустых разделов

Полный пример можно найти: https://gist.github.com/geoHeil/6a23d18ccec085d486165089f9f430f2

Моей цель состоит в том, чтобы заполнить значения нан с последней хорошей известной величиной через РД как улучшение Spark/Scala: fill nan with last good observation.

Но некоторые разделы не содержат никакого значения:

###################### carry 
Map(2 -> None, 5 -> None, 4 -> None, 7 -> Some(FooBar(2016-01-04,lastAssumingSameDate)), 1 -> Some(FooBar(2016-01-01,first)), 3 -> Some(FooBar(2016-01-02,second)), 6 -> None, 0 -> None) 
(2,None) 
(5,None) 
(4,None) 
(7,Some(FooBar(2016-01-04,lastAssumingSameDate))) 
(1,Some(FooBar(2016-01-01,first))) 
(3,Some(FooBar(2016-01-02,second))) 
(6,None) 
(0,None) 
() 
###################### carry 

case class FooBar(foo: Option[Date], bar: String) 
val myDf = Seq(("2016-01-01", "first"), ("2016-01-02", "second"), 
    ("2016-wrongFormat", "noValidFormat"), 
    ("2016-01-04", "lastAssumingSameDate")) 
    .toDF("foo", "bar") 
    .withColumn("foo", 'foo.cast("Date")) 
    .as[FooBar] 
def notMissing(row: Option[FooBar]): Boolean = row.isDefined && row.get.foo.isDefined 
myDf.rdd.filter(x => notMissing(Some(x))).count 
val toCarry: Map[Int, Option[FooBar]] = myDf.rdd.mapPartitionsWithIndex { case (i, iter) => Iterator((i, iter.filter(x => notMissing(Some(x))).toSeq.lastOption)) }.collectAsMap 

При использовании

val toCarryBd = spark.sparkContext.broadcast(toCarry) 
def fill(i: Int, iter: Iterator[FooBar]): Iterator[FooBar] = { 
    if (iter.isEmpty) { 
     iter 
    } else { 
     var lastNotNullRow: Option[FooBar] = toCarryBd.value.get(i).get 
     iter.map(foo => { 
     println("original ", foo) 
     if (!notMissing(Some(foo))) { 
      println("replaced") 
      // this will go into the default case 
      // FooBar(lastNotNullRow.getOrElse(FooBar(Option(Date.valueOf("2016-01-01")), "DUMMY")).foo, foo.bar) 
      FooBar(lastNotNullRow.get.foo, foo.bar) // TODO warning this throws an error 
     } else { 
      lastNotNullRow = Some(foo) 
      foo 
     } 
     }) 
    } 
    } 

    val imputed: RDD[FooBar] = myDf.rdd.mapPartitionsWithIndex { case (i, iter) => fill(i, iter) } 

для заполнения значений это приведет к краху.

выход при применении ввода из ответа. До сих пор не 100% там

+----------+--------------------+ 
|  foo|     bar| 
+----------+--------------------+ 
|2016-01-01|    first| 
|2016-01-02|    second| 
|2016-01-04|  noValidFormat| 
|2016-01-04|lastAssumingSameDate| 
+----------+--------------------+ 

ответ

1

Что касается обработки пустых разделов при работе mapPartitions (и подобный), общий подход является возвращение пустого итератора правильного типа, когда у вас есть пустой входной итератор.

Похоже, что ваш код делает это, однако кажется, что у вас, вероятно, есть ошибка в вашей логике приложения (а именно, предполагается, что если в разделе отсутствует запись, у нее будет либо предыдущая строка в тот же раздел, который хорош или что предыдущий раздел не пуст и имеет хорошую строку, что необязательно имеет место). Вы частично исправили эту проблему, пройдя через и для каждого раздела, собирающего последнее предыдущее хорошее значение, а затем, если у вас нет хорошего значения в начале раздела, найдите значение в собранном массиве.

Однако, если это также происходит в то же время, что предыдущий раздел пуст, вам нужно будет перейти к предыдущему предыдущему значению раздела до тех пор, пока не найдете тот, который вы ищете. (Обратите внимание, что предполагается, что первая запись в вашем наборе данных действительна, если это не ваш код, который все равно будет терпеть неудачу).

Ваше решение действительно близко к работе, но имеет некоторые незначительные допущения, которые не всегда обязательно сохраняются.

+0

Спасибо за прекрасный комментарий. Это помогает мне заполнить следующую, но не последнюю известную ценность. –

+0

Правильно, вам просто нужно искать назад, чтобы сделать последнее хорошо известно. – Holden

+0

Вы имеете в виду здесь вместо +1 a -1: 'while (lastNotNullRow == None) { lastNotNullRow = toCarryBd.value.get (i + 1) .get }'? Но это не сработает, если первый раздел пуст (как в этом случае), я думаю, что замена карты уже в правильном порядке. –