2016-02-23 1 views
0

У меня есть два RDD, содержащих информацию о времени. RDD разделены на разные разделы. Один имеет видАгрегация одного RDD в соответствии со значением другого RDD Spark (Java)

16:00:00 
16:00:18 
16:00:25 
16:01:01 
16:01:34 
16:02:12 
16:02:42 
... 

и другой, содержащий промежуток времени в виде tuple2

<16:00:00, 16:00:59> 
<16:01:00, 16:01:59> 
<16:02:00, 16:02:59> 
... 

мне нужно объединить первый и второй RDD, путем суммирования значения первой в соответствии со значениями во-вторых, для того, чтобы получить что-то вроде

<<16:00:00, 16:00:59>, [16:00:00,16:00:18,16:00:25]> 
<<16:01:00, 16:01:59>, [16:01:01,16:01:34]> 
<<16:02:00, 16:02:59>, [16:02:12,16:02:42]> 
... 

Или в альтернативе, что-то вроде

<<16:00:00, 16:00:59>, 16:00:00> 
<<16:00:00, 16:00:59>, 16:00:18> 
<<16:00:00, 16:00:59>, 16:00:25> 
<<16:01:00, 16:01:59>, 16:01:01> 
<<16:01:00, 16:01:59>, 16:01:34> 
<<16:02:00, 16:02:59>, 16:02:12> 
<<16:02:00, 16:02:59>, 16:02:42> 
... 

Я пытаюсь использовать весь спектр функций преобразования искры, но мне трудно найти тот, который работает на RDD такой разной природы. Я знаю, что могу пойти на продукт cartesian, а затем фильтровать, но мне бы хотелось «лучшее» решение. Я пробовал zipPartition, что может сработать, но я могу иметь несогласованность в разделах, например. 16:00:00 может оказаться в разделе, где соответствующее значение агрегации (кортеж <16:00:00, 16:00:59>) отсутствует. Каков наилучший способ справиться с этим?

PS: Я использую Java, но решения Scala также приветствуются. Спасибо

+1

ли интервалы всегда регулярно? – zero323

+0

Не обязательно – McKracken

ответ

1

Я упростил нижеследующее, чтобы использовать ints, но я считаю, что то же самое можно сделать раз. Хотя примеры в Scala, я подозреваю, что все это можно сделать и на Java.

Если диапазоны являются регулярными, я бы повернул «значения» RDD в range,value, затем выполнил простое соединение.

val values = Seq(1, 5, 10, 14, 20) 
val valuesRdd = sc.parallelize(values, 2) 
valuesRdd.map(x => (((x/10)*10, ((x/10)*10)+9), x)).collect 

Однако если диапазоны не являются регулярными, то:

Если вы не возражаете, используя DataFrames то вариант будет использовать определенную функцию пользовательских создать столбец, основанный на V, если в в данном диапазоне и присоединиться к этому.

case class Range(low : Int, high :Int) 
val ranges = Seq(Range(0,9), Range(10,19), Range(20,29)); 
val rangesDf = sc.parallelize(ranges, 2).toDF 

case class Value(value : Int) 
val values = Seq(Value(1), Value(5), Value(10), Value(14), Value(20)) 
val valuesDf = sc.parallelize(values, 2).toDF 

val inRange = udf{(v: Int, low: Int, high : Int) => v >= low && v<= high} 

rangesDf.join(valuesDf, inRange(valuesDf("value"), rangesDf("low"), rangesDf("high"))).show 

Следующий вариант был бы взрываются из диапазонов и присоединиться на разобранную версию:

val explodedRange = rangesRdd.map(x => (x, List.range(x._1, x._2 + 1))).flatMap({ case (range, lst) => lst.map { x => (x, range)} }) 
val valuesRdd = sc.parallelize(values, 2).map(x => (x,true)) 
valuesRdd.join(explodedRange).map(x => (x._2._2, x._1)).collect 
+0

Я пытаюсь оставить использование DataFrame в качестве последнего решения. Ваш последний вариант кажется выполнимым, но все же мне нужно будет взорвать диапазоны каждую секунду, значительно увеличив количество данных (вероятно, все же лучше, чем «декартовое» преобразование). Я ошибаюсь, если я говорю, что я могу использовать 'leftOuterJoin' в третьей строке? И, наконец, мои объекты, скорее всего, являются объектами календаря (или похожими): что использует Spark при сравнении индексов (например, в 'join')? Использует ли он метод compareTo? – McKracken

+0

Если у вас есть случаи, когда 'значение' может не существовать в любом' диапазоне', то использование 'leftOuterJoin' гарантирует, что вы получите это значение. Я считаю, что Spark использует .equals при сравнении ключей в соединении. –

+0

Я пробую третий вариант, но, видимо, есть некоторые проблемы с соединением на последнем шаге. Я использую класс MyDate для представления даты, так что я могу переопределить равные (мне это нужно).Индексы в 'valuesRdd' и в' explodedRange' являются экземплярами MyDate. Equals фактически используется при сравнении индексов в соединении, но, судя по всему, объединение не сравнивает каждый индекс в 'valuesRdd' с каждым индексом в' explodedRange'. Что меня пугает больше всего, так это то, что в разных прогонах объединение возвращает разные результаты (rdd из 0, 1 или 2 записей, тогда как они должны быть 695). Используется один раздел. – McKracken

 Смежные вопросы

  • Нет связанных вопросов^_^