2017-02-23 138 views
2

У меня проблема с данными временных рядов. При сбоях питания в наборе данных отсутствуют некоторые временные метки. Мне нужно заполнить эти пробелы, добавив строки, и после этого я могу интерполировать отсутствующие значения.Заполнение пробелов во временных рядах Spark

Входные данные:

periodstart    usage 
--------------------------------- 
2015-09-11 02:15   23000 
2015-09-11 03:15   23344 
2015-09-11 03:30   23283 
2015-09-11 03:45   23786 
2015-09-11 04:00   25039 

Требуются выход:

periodstart    usage 
--------------------------------- 
2015-09-11 02:15   23000 
2015-09-11 02:30   0 
2015-09-11 02:45   0 
2015-09-11 03:00   0 
2015-09-11 03:15   23344 
2015-09-11 03:30   23283 
2015-09-11 03:45   23786 
2015-09-11 04:00   25039 

Теперь я зафиксировал это с петлей в то время как функции набора данных Еогеасп. Проблема в том, что я должен сначала собрать набор данных для драйвера, прежде чем я смогу сделать цикл while. Так что это не правильный путь для Искры.

Может ли кто-нибудь дать мне лучшее решение?

это мой код:

MissingMeasurementsDS.collect().foreach(row => { 
    // empty list for new generated measurements 
    val output = ListBuffer.empty[Measurement] 
    // Missing measurements 
    val missingMeasurements = row.getAs[Int]("missingmeasurements") 
    val lastTimestamp = row.getAs[Timestamp]("previousperiodstart") 
    //Generate missing timestamps 
    var i = 1 
    while (i <= missingMeasurements) { 
    //Increment timestamp with 15 minutes (900000 milliseconds) 
    val newTimestamp = lastTimestamp.getTime + (900000 * i) 
    output += Measurement(new Timestamp(newTimestamp), 0)) 
    i += 1 
    } 
    //Join interpolated measurements with correct measurements 
    completeMeasurementsDS.join(output.toDS()) 
}) 
completeMeasurementsDS.show() 
println("OutputDF count = " + completeMeasurementsDS.count()) 
+0

RHeutz Вы можете вставить фрагмент кода для добавления недостающих значений здесь? –

ответ

3

Если вход DataFrame имеет следующую структуру:

root 
|-- periodstart: timestamp (nullable = true) 
|-- usage: long (nullable = true) 

Определить мин/макс:

val (minp, maxp) = df 
    .select(min($"periodstart").cast("bigint"), max($"periodstart".cast("bigint"))) 
    .as[(Long, Long)] 
    .first 

Установить шаг, например, в течение 15 минут :

val step: Long = 15 * 60 

Сформировать референсный диапазон:

val reference = spark 
    .range((minp/step) * step, ((maxp/step) + 1) * step, step) 
    .select($"id".cast("timestamp").alias("periodstart")) 

Регистрация и заполнить пробелы:

reference.join(df, Seq("periodstart"), "leftouter").na.fill(0, Seq("usage"))