Для проверки я использовал следующий код pyspark воспроизвести ваш RDD
:
from pyspark.mllib.recommendation import Rating
Rec = sc.parallelize([(10000, (Rating(user=10000, product=14780773, rating=7.35695469892999e-05),
Rating(user=10000, product=17229476, rating=5.648606256948921e-05))),
(0, (Rating(user=0, product=16750010, rating=0.04405213492474741),
Rating(user=0, product=17416511, rating=0.019491942665715176))),
(20000, (Rating(user=20000, product=17433348, rating=0.017938298063142653),
Rating(user=20000, product=17333969, rating=0.01505112418739887)))])
Этот RDD состоит из пар ключ-значение, каждое значение, состоящее из записи w с кортежами рейтинга. Вам нужно сопоставить RDD, чтобы сохранить только записи, а затем взорвать результат, чтобы иметь отдельные кортежи для каждой рекомендации. flatMap(f)
функция будет конденсироваться эти два шага, как так:
flatRec = Rec.flatMap(lambda p: p[1])
что приводит к РДУ в виде:
[Rating(user=10000, product=14780773, rating=7.35695469892999e-05),
Rating(user=10000, product=17229476, rating=5.648606256948921e-05),
Rating(user=0, product=16750010, rating=0.04405213492474741),
Rating(user=0, product=17416511, rating=0.019491942665715176),
Rating(user=20000, product=17433348, rating=0.017938298063142653),
Rating(user=20000, product=17333969, rating=0.01505112418739887)]
Теперь все, что нужно, чтобы использовать функцию createDataFrame
, чтобы превратить это в DataFrame , Каждый кортеж рейтинга будет преобразован в строку DataFrame, и поскольку элементы отмечены, вам не нужно указывать схему.
recDF = sqlContext.createDataFrame(flatRec).show()
Этот выход будет следующее:
+-----+--------+--------------------+
| user| product| rating|
+-----+--------+--------------------+
|10000|14780773| 7.35695469892999E-5|
|10000|17229476|5.648606256948921E-5|
| 0|16750010| 0.04405213492474741|
| 0|17416511|0.019491942665715176|
|20000|17433348|0.017938298063142653|
|20000|17333969| 0.01505112418739887|
+-----+--------+--------------------+
Необходимая функция [покрыта в PySpark документы] (https://spark.apache.org/docs/1.5.2/api/python /pyspark.sql.html). Посмотрите 'createDataFrame'. –