2017-02-20 25 views
1

Я попытался решить прикладную проблему cogroup. но я действительно не знаю, что ...Как использовать применяемый метод cogroup с функциональной работой в искровом режиме?

Если есть два RDD с разными ключами, как в приведенном ниже примере, возможно ли извлечь действительный data1 только тогда, когда первое слово совпадает с cogroup?

val data1 = sc.parallelize(Seq(("aa", 1), ("ba", 2), ("bc", 2), ("b", 3), ("c", 1))) 
val data2 = sc.parallelize(Seq(("a", 3), ("b", 5))) 

val cogroupRdd: RDD[(String, (Iterable[Int], Iterable[Int]))] = data1.cogroup(data2) 
/* List(
    (ba,(CompactBuffer(2),CompactBuffer())), 
    (bc,(CompactBuffer(2),CompactBuffer())), 
    (a,(CompactBuffer(),CompactBuffer(3))), 
    (b,(CompactBuffer(3),CompactBuffer(5))), 
    (c,(CompactBuffer(1),CompactBuffer())), 
    (aa,(CompactBuffer(1),CompactBuffer())) 
) */ 

Результат должен быть Array(("aa", 1), ("ba", 2), ("bc", 2), ("b", 3))

Я решил эту проблему с помощью broadcast() как сказал @mrsrinivas. Но broadcast() не подходит для больших данных.

val bcast = sc.broadcast(data2.map(_._1).collect()) 
val result = data1.filter(r => bcast.value.contains(myFuncOper(r._1))) 

Есть ли способ решить эту проблему, используя cogroup с функциональной операцией?

ответ

1

Вы можете использовать cogroup после извлечения ключа, который будет соответствовать data2 «S ключи, а затем использовать filter и map, чтобы удалить значения без спичек и„реструктуризировать“данные:

val result: RDD[(String, Int)] = data1 
    .keyBy(_._1.substring(0, 1)) // key by first character 
    .cogroup(data2) 
    .filter { case (_, (_, data2Values)) => data2Values.nonEmpty } 
    .flatMap { case (_, (data1Values, _)) => data1Values } 
+0

изменения порядка сортировки, но это ответ, который я разыскивается. Спасибо за ваш ответ! –

+0

Не могли бы вы ответить на еще один вопрос? Если вы не возражаете? Если я использовал 'keyBy' для больших данных, это займет много времени. Есть ли другая альтернатива? –

+0

Я не вижу более быстрой альтернативы - вам нужно будет создать ключ для cogroup, вы можете сделать это с помощью 'map', но это займет столько же времени. –

1

Short:

val result = data1 
    .flatMap(x => x._1.split("").map(y => (y, x))) 
    .join(data2) 
    .map(x => x._2._1) 
    .distinct 

Подробно:

flatMap(x => x._1.split("").map(y => (y, x))) держит

List(
    (a, (aa, 1)), 
    (a, (aa, 1)), 
    (b, (ba, 2)), 
    (a, (ba, 2)), 
    (b, (bc, 2)), 
    (c, (bc, 2)), 
    (b, (b, 3)), 
    (c, (c, 1)) 
) 

после join(data2)

List(
    (a, ((aa, 1), 3)), 
    (a, ((aa, 1), 3)), 
    (a, ((ba, 2), 3)), 
    (b, ((ba, 2), 5)), 
    (b, ((bc, 2), 5)), 
    (b, ((b, 3), 5)) 
) 

Теперь все мы заинтересованы в различных 2-первых пар, которые могут быть выполнены с помощью map(x => x._2._1).distinct

+0

Спасибо за ваш ответ еще раз! –