Я получаю сообщение об ошибке при запуске ниже написанного кода SPARK. Я пытаюсь найти сумму всех векторов на основе ключа. Каждая строка ввода начинается с ключа (целое число), а затем 127 чисел с плавающей запятой, который представляет собой один вектор, имеющий 127 измерений, т.е. каждая строка начинается с ключа и вектора.Ошибка ввода кода искры
from cStringIO import StringIO
class testing:
def __str__(self):
file_str = StringIO()
for n in self.vector:
file_str.write(str(n))
file_str.write(" ")
return file_str.getvalue()
def __init__(self,txt="",initial=False):
self.vector = [0.0]*128
if len(txt)==0:
return
i=0
for n in txt.split():
if i<128:
self.vector[i]=float(n)
i = i+1
continue
self.filename=n
break
def addVec(self,r):
a = testing()
for n in xrange(0,128):
a.vector[n] = self.vector[n] + r.vector[n]
return a
def InitializeAndReturnPair(string,first=False):
vec = testing(string,first)
return 1,vec
from pyspark import SparkConf, SparkContext
conf = (SparkConf()
.setMaster("local")
.setAppName("My app")
.set("spark.executor.memory", "1g"))
sc = SparkContext(conf = conf)
inp = sc.textFile("input.txt")
output = inp.map(lambda s: InitializeAndReturnPair(s,True)).cache()
output.saveAsTextFile("output")
print output.reduceByKey(lambda a,b : a).collect()
Пример строки в input.txt
6,0 156,0 26,0 3,0 1,0 0,0 2,0 1,0 15,0 113,0 53,0 139,0 156,0 0,0 0,0 0,0 156,0 29,0 1,0 38,0 59,0 0,0 0,0 0,0 28,0 4,0 2,0 9,0 1,0 0,0 0,0 0,0 9,0 83,0 13,0 1,0 0,0 9,0 42,0 7,0 41,0 71,0 74,0 123,0 35,0 17,0 7,0 2,0 156,0 27,0 6,0 33,0 11,0 2,0 0,0 11,0 35,0 4,0 2,0 4,0 1,0 3,0 2,0 4,0 0,0 0,0 0,0 0,0 2,0 19,0 45,0 17,0 47,0 2,0 2,0 7,0 59,0 90,0 15,0 11,0 156,0 14,0 1,0 4,0 9,0 11,0 2,0 29,0 35,0 6,0 5,0 9,0 4,0 2,0 1,0 3,0 1,0 0,0 0,0 0,0 1,0 5,0 25,0 14,0 27,0 2,0 0,0 2,0 86,0 48,0 10,0 6,0 156,0 23,0 1,0 2,0 21,0 6,0 0,0 3,0 31,0 10,0 4,0 3,0 0,0 0,0 1,0 2,0
Ниже приведена ошибка, которую я получаю. Эта ошибка происходит из последней строки кода т.е. output.reduceByKey
Сообщение об ошибке - http://pastebin.com/0tqiiJQm
Не совсем уверен, как подойти к этой проблеме. Я попытался использовать MarshalSerializer
, но он дал такую же проблему.
------------------------------ Ответ -------------- ----------------------
У меня есть ответ от apache user list по тому же вопросу. В основном картограф/редуктор, который запускается в кластере оленьей кожи есть определение класса, и мы должны пройти класс, написав класс в другом модуле и приложив при настройке SparkContext с помощью
sc.addPyFile(os.path(HOMEDirectory + "module.py"))
Спасибо всем за помощь вне.
Это решение классно и хорошо работает. Можете ли вы предложить, где бы я ошибся, если я хочу вернуть пользовательский объект класса вместо массива? Я планирую возвращать другие объекты вместе с этим массивом. – siddardha
Причина, по которой вы столкнулись с проблемой, состоит в том, что класс тестирования не является сериализуемым. Посмотрите на документы python для получения дополнительной информации о том, как исправить это https://docs.python.org/2/library/pickle.html#pickling-and-unpickling-normal-class-instances . Другой вариант - создать ваши классы после того, как вы собрали выходные данные, например: data = output.reduceByKey (lambda a, b: np.add (a, b)). collect() final_output = [тестирование (значение) для значения в данных] Это будет работать только в том случае, если размер данных достаточно мал, чтобы обрабатываться непараллельно. – Anant
Я опубликовал тот же вопрос в списке пользователей искры Apache, и я получил ответ о том, как использовать пользовательский класс, который я добавлю в самом вопросе для обмена знаниями. Я отвечу на ваш ответ, поскольку он решает проблему, с которой я столкнулся в некоторой степени – siddardha