2017-02-22 53 views
0

Почему я постоянно получаю эту ошибку каждый раз, когда я пытаюсь выполнить действие своего RDD &, как это исправить?Правильное использование flatMap

/databricks/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 
    317     raise Py4JJavaError(
    318      "An error occurred while calling {0}{1}{2}.\n". 
--> 319      format(target_id, ".", name), value) 
    320    else: 
    321     raise Py4JError(

Я попытался выяснить, который является последним РДД я могу сделать действие, и его ratingByUser, что указывает на то проблема заключается в flatMap.

Что я пытаюсь сделать, так это то, что я беру CSV с (userID, movieID, рейтинг), и я хочу создать уникальные комбинации movieID для каждого идентификатора пользователя с рейтингом, но разные пользователи могут генерировать одну и ту же пару идентификаторов movieID, ех для этого CSV:

1,2000,5

1,2001,2

1,2002,3 ​​

2,2000,4

2,2001, 1

2,2004,5 ​​

Я хочу РДД:

ключ (2000,2001), значение (5,2,1)

ключ (2000,2002), значение (5,3,1)

ключ (2001,2002), значение (2,3,1)

ключ (2000,2001), значение (4,1,1)

ключ (2000,2004), значение (4,5,1)

ключ (2001,2004), значение (1,5,1)

# First Map function - gets line and returns key(userID) value(movieID,rating) 
def parseLine(line): 
    fields=line.split(",") 
    userID=int(fields[0]) 
    movieID=int(fields[1]) 
    rating=int(fields[2]) 
    return userID, (movieID,rating) 
# Function to create movie unique pairs with ratings 
# all pair start with the lowest ID 
# returns key (movieIDj,movieIDi) & value (rating-j,rating-i,1) 
# the 1 in value is added in order to count number of ratings in the reduce 

def createPairs(userRatings): 
    pairs=[] 
    for i1 in range(len(userRatings[1])-1): 
    for i2 in range(i1+1,len(userRatings[1])): 
     if userRatings[i1][0]<userRatings[1][i2][0]: 
     pairs.append(((userRatings[1][i1][0],userRatings[1][i2][0]),(userRatings[1][i1][1],userRatings[1][i2][1],1))) 
     else: 
     pairs.append(((userRatings[1][i2][0],userRatings[1][i1][0]),(userRatings[1][i2][1],userRatings[1][i1][1],1))) 
    return pairs 

# Create SC object from the ratings file 
lines = sc.textFile("/FileStore/tables/dvmlbdnj1487603982330/ratings.csv") 
# Map lines to Key(userID),Value(movieID,rating) 
movieRatings = lines.map(parseLine) 
# Join all rating by same user into one key 
# (UserID1,(movie1,rating1)),(UserID1,(movie2,rating2)) --> UserID1,[(movie1,rating1),(movie2,rating2)] 
ratingsPerUser = movieRatings.groupByKey() 
# activate createPairs func 
# We use flatMap, since each user have different number of ratings --> different number pairs 
pairsOfMovies = ratingsPerUser.flatMap(createPairs) 

ответ

1

Проблема функция передается flatMap не flatMap. Группы по ключевым возвращает итератор:

  • Это не может быть пройдено несколько раз
  • Это не может быть проиндексированы.

Преобразовать в список первых:

ratingsPerUser.mapValues(list).flatMap(createPairs)