Я попытался решить прикладную проблему cogroup
. но я действительно не знаю, что ...Как использовать применяемый метод cogroup с функциональной работой в искровом режиме?
Если есть два RDD с разными ключами, как в приведенном ниже примере, возможно ли извлечь действительный data1
только тогда, когда первое слово совпадает с cogroup
?
val data1 = sc.parallelize(Seq(("aa", 1), ("ba", 2), ("bc", 2), ("b", 3), ("c", 1)))
val data2 = sc.parallelize(Seq(("a", 3), ("b", 5)))
val cogroupRdd: RDD[(String, (Iterable[Int], Iterable[Int]))] = data1.cogroup(data2)
/* List(
(ba,(CompactBuffer(2),CompactBuffer())),
(bc,(CompactBuffer(2),CompactBuffer())),
(a,(CompactBuffer(),CompactBuffer(3))),
(b,(CompactBuffer(3),CompactBuffer(5))),
(c,(CompactBuffer(1),CompactBuffer())),
(aa,(CompactBuffer(1),CompactBuffer()))
) */
Результат должен быть Array(("aa", 1), ("ba", 2), ("bc", 2), ("b", 3))
Я решил эту проблему с помощью broadcast()
как сказал @mrsrinivas. Но broadcast()
не подходит для больших данных.
val bcast = sc.broadcast(data2.map(_._1).collect())
val result = data1.filter(r => bcast.value.contains(myFuncOper(r._1)))
Есть ли способ решить эту проблему, используя cogroup
с функциональной операцией?
изменения порядка сортировки, но это ответ, который я разыскивается. Спасибо за ваш ответ! –
Не могли бы вы ответить на еще один вопрос? Если вы не возражаете? Если я использовал 'keyBy' для больших данных, это займет много времени. Есть ли другая альтернатива? –
Я не вижу более быстрой альтернативы - вам нужно будет создать ключ для cogroup, вы можете сделать это с помощью 'map', но это займет столько же времени. –