2017-02-14 17 views
0

У меня есть RDD рейтинга продукта, используя объект MLlib Rating, который является всего лишь кортежем (int userId, int productId, double rating). Я хочу удалить любой элемент из RDD, который представляет собой обзор продукта с слишком небольшим количеством рейтингов.Фильтр RDD, основанный на количестве вхождений

Например РДД может быть таким:

Rating(35, 1, 5.0) 
Rating(18, 1, 4.0) 
Rating(29, 2, 3.0) 
Rating(12, 2, 2.0) 
Rating(65, 3, 1.0) 

и если я фильтруюсь, что для удаления какого-либо продукта с менее чем 2 обзорами, было бы просто отфильтровать последний рейтинг и вернуть первые четыре. (Я хочу фильтровать с более высоким минимальным количеством просмотров, чем 2, но только для примера).

В настоящее время у меня есть этот код, который выводит последовательность из идентификаторов продуктов в порядке числа оценок, но я не был уверен в пути для фильтрации от основного RDD основан на том, что и он кажется неэффективным в любом случае:

val mostRated = ratings.map(_._2.product) 
         .countByValue 
         .toSeq 
         .sortBy(- _._2) 
         .map(_._1) 

ответ

0

Вы можете группировать рдд по PRODUCTID, а затем фильтровать его на основе, если длина группы больше, чем пороговое значение (1 здесь). Используйте flatMap для извлечения результатов из сгруппированных РДА:

case class Rating(UserId: Int, ProductId: Int, Rating: Double) 

val ratings = sc.parallelize(Seq(Rating(35, 1, 5.0), 
    Rating(18, 1, 4.0), 
    Rating(29, 2, 3.0), 
    Rating(12, 2, 2.0), 
    Rating(65, 3, 1.0))) 

val prodMinCounts = ratings.groupBy(_.ProductId). 
          filter(_._2.toSeq.length > 1). 
          flatMap(_._2) 
prodMinCounts.collect 
// res14: Array[Rating] = Array(Rating(35,1,5.0), Rating(18,1,4.0), Rating(29,2,3.0), Rating(12,2,2.0))