2016-10-11 3 views
0

Я хочу иметь логику в моем combineByKey/reduceByKey/foldByKey, который полагается на ключ, который в настоящее время работает. Из того, что я могу сказать по сигнатурам метода, единственными параметрами, переданными этим методам, являются значения, которые комбинируются/сводятся/складываются.Почему я не могу ссылаться на ключ в логике сокращения?

Используя простой пример, в котором я просто РД, который является (int, int) кортежами, то результат я хочу это рдд ключа по tuple[0] где значению является int ближе всего к ключу.

Например:

(1, 8) 
(1, 3) 
(1, -1) 
(2, 4) 
(2, 5) 
(2, 2) 
(3, 2) 
(3, 4) 

должны сводиться к:

(1, 3) 
(2, 2) 
(3, 2) 

Примечание в сравнении (1, 3) и (1, -1) я не волнует, какой из них выбрали, так как они оба на том же расстоянии. То же самое для клавиши «3».

Как я предположил бы, что делать это было бы что-то вдоль линий:

rdd.reduceByKey(lambda key, v1, v2: v1 if abs(key - v1) < abs(key - v2) else v2) 

Но функция reduce только принимает 2 аргумента: два значения, которые необходимо объединить. Похоже, что самым простым методом было бы ссылаться на ключ в моем редукторе, чтобы достичь моей цели; Это возможно?

Если я пытаюсь это я получаю сообщение об ошибке:

rdd = sc.parallelize([(1, 8), (1, 3), (1, -1), (2, 4), (2, 5), (2, 2), (3, 2), (3, 4)]) 
rdd.reduceByKey(lambda key, v1, v2: v1 if abs(key - v1) < abs(key - v2) else v2).collect() 

TypeError:() takes exactly 3 arguments (2 given)

Я не действительно ищет решение этой проблемы, например. Мне интересно, есть ли причина, по которой ключ не передается функции reduceByKey? Я предполагаю, что это какой-то базовый принцип философии сокращения карты, которую я пропускаю.


Примечание я могу решить мой пример, вставив шаг карты, которая отображает каждое значение в кортеж, состоящий из величины и расстояния от ключа:

rdd = sc.parallelize([(1, 8), (1, 3), (1, -1), (2, 4), (2, 5), (2, 2), (3, 2), (3, 4)]) 
rdd = rdd.map(lambda tup: (tup[0], tuple([tup[1], abs(tup[0] - tup[1])]))) 
rdd.reduceByKey(lambda v1, v2: v1 if v1[1] < v2[1] else v2).mapValues(lambda x: x[0]).collectAsMap() 

ответ

0

Я думаю, что нет серьезных оснований не пропускать ключи.
однако, я чувствую, что reduceByKey API был разработан для общего использования - вычислить сумму значений для каждого ключа. До сих пор я никогда не нуждался в ключах в вычислении стоимости. Но это только мое мнение.

Также проблема, которую вы решили, кажется простой проблемой агрегации. min() и groupByKey могут найти ответ. Я знаю, что вы не ищете решение, но вот как я напишу.

from pyspark import SparkContext 

sc = SparkContext() 
rdd = sc.parallelize([(1, 8), (1, 3), (1, -1), (2, 4), (2, 5), (2, 2), (3, 2), (3, 4)]) 
reduced = rdd.groupByKey().map(lambda (k, v): (k, min(v, key=lambda e:abs(e-k)))) 
print(reduced.collectAsMap()) 

Результат

{1: 3, 2: 2, 3: 2} 
+0

Хороший ответ. Очень возможно, что реальный ответ на мой вопрос просто «потому что это не API». Но я все равно думал об этом. – FGreg