2016-08-20 11 views
-5

Мои данные могут существовать в РДУ двух формах, либоДля искры, какова самая быстрая структура данных, чтобы уменьшить ее? Карты или списки кортежей?

RDD[Map[String, Map[String, Int]]] 

или

RDD[List[(String, List[(String,Int)])]] 

Как вы можете видеть, во втором примере «отображение» данных предписана первым элементом кортежа является ключ. Рассмотрим две записи в моем rdd, назовите их R1 и R2. Я буду объединять ключи в R1 и R2. Когда R1 и R2 содержат один и тот же ключ, я делаю дальнейшее объединение этих значений. В качестве примера, скажем, что оба R1 и R2 содержат запись

outer_key1 -> (inner_key1 -> 1) 

затем полученное слияние будет производить

outer_key1 -> (inner_key1 -> 2) 

Итак, мой вопрос, какой структура данных быстрее и больше памятей эффективным для искры, чтобы уменьшить внешний и внутренний ключ? Карты карт или списки (key, list_of_tuple). Моя интуиция заключается в том, что карты будут быстрее сокращаться по ключу, учитывая их поиск 0 (1). Однако, учитывая то, как большинство карт реализовано, я уверен, что на карте RDDS есть приличная сумма потерянной памяти.

Как реальный пример такого рода объединения, мои РДУ представляют

Map(email_address->(date->number_of_emails_recieved_that_day)) where each RDD contains many email addresses 
+0

В зависимости от того, хотите ли вы иметь доступ к 'findByKey' или' sequentialAccess' над вашими внутренними данными. –

+0

И ваш RDD является 'RDD [T]' где 'T' является' Map [String, Map [String, Int]] '. Это не 'RDD [(K, T)]', так что вы подразумеваете под 'Key' ?? –

ответ

0

Я думаю, что у вас есть неправильно понимаемые о концепции РДД-х. Вам необходимо преобразовать ваши данные в соответствующую структуру, чтобы использовать возможности RDD.

Итак, вам нужно подумать о том, что вы хотите вычислить для принятия решения о вашем RDD.

В соответствии с моим пониманием вашего вопроса. У вас есть 2 источника данных, и вы хотите объединить данные, полученные вами из этих двух источников данных. Таким образом, вы создаете свои 2 RDD из этих источников, затем вы объединяете их.

// First we will have to create RDD's from our data source.  

// create RDD from source 1 
// Lets say you have a List[(String, List[(String, Int)]] 
val src1 = List(
    ("[email protected]", List(("01/01/2016", 10), ("05/01/2016", 20))) 
    ("[email protected]", List(("01/01/2016", 5), ("06/01/2016", 30)) 
) 

// Now enters spark 
val rddSrc1: RDD[(String, List[String, Int])] = sc.parallelize(src1) 


// create RDD from source 2 
// Lets say you have a Map[(String, Map[String, Int]] 
val src2 = Map(
    "[email protected]" -> Map("01/01/2016" -> 10, "05/01/2016" -> 20) 
    "[email protected]" -> Map("01/01/2016" -> 5, "06/01/2016" -> 30) 
) 

// Now enters spark 
val rddSrc1: RDD[(String, Map[String, Int])] = sc.parallelize(src2.toList) 


// Now since you want to merge on both "email" and "date" lets make ("email", "date") tuple as key. 

rddSrc1T: RDD[(String, String), Int] = rddSrc1 
    .flatMap({ 
    case (email, list) => list.map({ 
     case (date, number) => ((email, date), number) 
    }) 
    }) 

rddSrc2T: RDD[(String, String), Int] = rddSrc1 
    .flatMap({ 
    case (email, map) => map.toList.map({ 
     case (date, number) => ((email, date), number) 
    }) 
    }) 

// now co-group the 2 RDD's 
rddCogroup: RDD[((String, String), Iterable[Int], Iterable[Int])) = rddSrc1T.cogroup(rddSrc2T) 

val totalNumberRdd: RDD[((String, String), Int] = rddCogroup.map({ 
    case ((email, date), iter1, iter2) => ((email, date), iter1.sum + iter2.sum) 
}) 
+0

(ничего не знаешь, но кажется, что ваши имена переменных неверны, например, src1/src2, называемые списком/картой в распараллеливании) – Will

+0

Что? Ваше заявление не ясно. –

+0

yup ... это типичная ошибка. –

 Смежные вопросы

  • Нет связанных вопросов^_^