Я хочу иметь логику в моем combineByKey
/reduceByKey
/foldByKey
, который полагается на ключ, который в настоящее время работает. Из того, что я могу сказать по сигнатурам метода, единственными параметрами, переданными этим методам, являются значения, которые комбинируются/сводятся/складываются.Почему я не могу ссылаться на ключ в логике сокращения?
Используя простой пример, в котором я просто РД, который является (int, int)
кортежами, то результат я хочу это рдд ключа по tuple[0]
где значению является int
ближе всего к ключу.
Например:
(1, 8)
(1, 3)
(1, -1)
(2, 4)
(2, 5)
(2, 2)
(3, 2)
(3, 4)
должны сводиться к:
(1, 3)
(2, 2)
(3, 2)
Примечание в сравнении (1, 3)
и (1, -1)
я не волнует, какой из них выбрали, так как они оба на том же расстоянии. То же самое для клавиши «3».
Как я предположил бы, что делать это было бы что-то вдоль линий:
rdd.reduceByKey(lambda key, v1, v2: v1 if abs(key - v1) < abs(key - v2) else v2)
Но функция reduce
только принимает 2 аргумента: два значения, которые необходимо объединить. Похоже, что самым простым методом было бы ссылаться на ключ в моем редукторе, чтобы достичь моей цели; Это возможно?
Если я пытаюсь это я получаю сообщение об ошибке:
rdd = sc.parallelize([(1, 8), (1, 3), (1, -1), (2, 4), (2, 5), (2, 2), (3, 2), (3, 4)])
rdd.reduceByKey(lambda key, v1, v2: v1 if abs(key - v1) < abs(key - v2) else v2).collect()
TypeError:() takes exactly 3 arguments (2 given)
Я не действительно ищет решение этой проблемы, например. Мне интересно, есть ли причина, по которой ключ не передается функции reduceByKey
? Я предполагаю, что это какой-то базовый принцип философии сокращения карты, которую я пропускаю.
Примечание я могу решить мой пример, вставив шаг карты, которая отображает каждое значение в кортеж, состоящий из величины и расстояния от ключа:
rdd = sc.parallelize([(1, 8), (1, 3), (1, -1), (2, 4), (2, 5), (2, 2), (3, 2), (3, 4)])
rdd = rdd.map(lambda tup: (tup[0], tuple([tup[1], abs(tup[0] - tup[1])])))
rdd.reduceByKey(lambda v1, v2: v1 if v1[1] < v2[1] else v2).mapValues(lambda x: x[0]).collectAsMap()
Хороший ответ. Очень возможно, что реальный ответ на мой вопрос просто «потому что это не API». Но я все равно думал об этом. – FGreg