2016-12-29 16 views
1

я приходить потоковых данные следующегоСпарка сократить

id, date, value 
i1, 12-01-2016, 10 
i2, 12-02-2016, 20 
i1, 12-01-2016, 30 
i2, 12-05-2016, 40 

Хочет сократить идентификатор, чтобы получить совокупную информацию стоимости по дате, как

продукция требуется от РДА для данного идентификатора и списка (дней 365) я должен поместить значение в позиции списка на основе день года, как 12-01-2016 составляет 336 и так как есть два экземпляра для устройства i1 с той же даты, когда они должны быть объединены

id, List [0|1|2|3|...    |336| 337| |340| |365] 
i1,        |10+30|  - this goes to 336 position 

i2,          20  40 -- this goes to 337 and 340 position 

Для этого уменьшите или измените группу.

+0

Is Is Spark Streaming или Structured Streaming? что ты уже испробовал? Где проблема? –

+0

Проблема - это обновление списка «на лету» и как уменьшить Do Если я уменьшу по id, все значения будут агрегированы независимо от дня года –

+1

Какой код у вас уже есть? Это Spark Streaming? –

ответ

0

Я дам вам базовый фрагмент кода с несколькими предположениями, поскольку вы не указали язык, источник данных или формат данных.

JavaDStream<String> lineStream = //Your data source for stream 
JavaPairDStream<String, Long> firstReduce = lineStream.mapToPair(line -> { 
    String[] fields = line.split(","); 
    String idDate = fields[0] + fields[1]; 
    Long value = Long.valueOf(fields[2]); 
    return new Tuple2<String, Long>(idDate, value); 
}).reduceByKey((v1, v2) -> { 
    return (v1+v2); 
}); 
firstReduce.map(idDateValueTuple -> { 
    String idDate = idDateValueTuple._1(); 
    Long valueSum = idDateValueTuple._2(); 
    String id = idDate.split(",")[0]; 
    String date = idDate.split(",")[]; 
    //TODO parse date and put the sumValue in array as you wish 
} 
+0

Извините, забыл упомянуть, что использую scala –

+0

Не имеет значения. Такой же код можно легко преобразовать в scala – code

+0

Я не думаю, что приведенная выше логика будет работать для случая с id = id2, поскольку дата отличается для каждой записи для id2 в данных образца. – Phoenix

0

Может только достичь этого. Не знаю, как добавить каждый элемент массива на последнем шаге. Надеюсь, это поможет !!! Если вы получите последний шаг или какой-либо альтернативный способ, оцените, если вы разместите его здесь!

def getDateDifference(dateStr:String):Int = { 
val startDate = "01-01-2016" 
val formatter = DateTimeFormatter.ofPattern("MM-dd-yyyy") 
val oldDate = LocalDate.parse(startDate, formatter) 
val currentDate = dateStr 
val newDate = LocalDate.parse(currentDate, formatter) 
return newDate.toEpochDay().toInt - oldDate.toEpochDay().toInt 
} 
def getArray(numberofDays:Int,data:Int):Iterable[Int] = { 
val daysArray = new Array[Int](366) 
daysArray(numberofDays) = data 
return daysArray 
} 
val idRDD = <read from stream> 
val idRDDMap = idRDD.map { rec => ((rec.split(",")(0),rec.split(",")(1)), 
     (getDateDifference(rec.split(",")(1)),rec.split(",")(2).toInt))} 
val idRDDconsiceMap = idRDDMap.map { rec => (rec._1._1,getArray(rec._2._1, rec._2._2)) } 
val finalRDD = idRDDconsiceMap.reduceByKey((acc,value)=>(???add each element of the arrays????)) 

 Смежные вопросы

  • Нет связанных вопросов^_^