2014-10-25 2 views
0

Я получаю сообщение об ошибке при запуске ниже написанного кода SPARK. Я пытаюсь найти сумму всех векторов на основе ключа. Каждая строка ввода начинается с ключа (целое число), а затем 127 чисел с плавающей запятой, который представляет собой один вектор, имеющий 127 измерений, т.е. каждая строка начинается с ключа и вектора.Ошибка ввода кода искры


from cStringIO import StringIO 

class testing: 
    def __str__(self): 
     file_str = StringIO() 
     for n in self.vector: 
      file_str.write(str(n)) 
      file_str.write(" ") 
     return file_str.getvalue() 
    def __init__(self,txt="",initial=False): 
     self.vector = [0.0]*128 
     if len(txt)==0: 
      return 
     i=0 
     for n in txt.split(): 
      if i<128: 
       self.vector[i]=float(n) 
       i = i+1 
       continue 
      self.filename=n 
      break 
def addVec(self,r): 
    a = testing() 
    for n in xrange(0,128): 
     a.vector[n] = self.vector[n] + r.vector[n] 
    return a 

def InitializeAndReturnPair(string,first=False): 
    vec = testing(string,first) 
    return 1,vec 


from pyspark import SparkConf, SparkContext 
conf = (SparkConf() 
     .setMaster("local") 
     .setAppName("My app") 
     .set("spark.executor.memory", "1g")) 
sc = SparkContext(conf = conf) 

inp = sc.textFile("input.txt") 
output = inp.map(lambda s: InitializeAndReturnPair(s,True)).cache() 
output.saveAsTextFile("output") 
print output.reduceByKey(lambda a,b : a).collect() 

Пример строки в input.txt

6,0 156,0 26,0 3,0 1,0 0,0 2,0 1,0 15,0 113,0 53,0 139,0 156,0 0,0 0,0 0,0 156,0 29,0 1,0 38,0 59,0 0,0 0,0 0,0 28,0 4,0 2,0 9,0 1,0 0,0 0,0 0,0 9,0 83,0 13,0 1,0 0,0 9,0 42,0 7,0 41,0 71,0 74,0 123,0 35,0 17,0 7,0 2,0 156,0 27,0 6,0 33,0 11,0 2,0 0,0 11,0 35,0 4,0 2,0 4,0 1,0 3,0 2,0 4,0 0,0 0,0 0,0 0,0 2,0 19,0 45,0 17,0 47,0 2,0 2,0 7,0 59,0 90,0 15,0 11,0 156,0 14,0 1,0 4,0 9,0 11,0 2,0 29,0 35,0 6,0 5,0 9,0 4,0 2,0 1,0 3,0 1,0 0,0 0,0 0,0 1,0 5,0 25,0 14,0 27,0 2,0 0,0 2,0 86,0 48,0 10,0 6,0 156,0 23,0 1,0 2,0 21,0 6,0 0,0 3,0 31,0 10,0 4,0 3,0 0,0 0,0 1,0 2,0

Ниже приведена ошибка, которую я получаю. Эта ошибка происходит из последней строки кода т.е. output.reduceByKey

Сообщение об ошибке - http://pastebin.com/0tqiiJQm

Не совсем уверен, как подойти к этой проблеме. Я попытался использовать MarshalSerializer, но он дал такую ​​же проблему.

------------------------------ Ответ -------------- ----------------------

У меня есть ответ от apache user list по тому же вопросу. В основном картограф/редуктор, который запускается в кластере оленьей кожи есть определение класса, и мы должны пройти класс, написав класс в другом модуле и приложив при настройке SparkContext с помощью

sc.addPyFile(os.path(HOMEDirectory + "module.py")) 

Спасибо всем за помощь вне.

ответ

0

Вы можете использовать массивы numpy, которые хорошо работают с искрой.

import numpy as np 

def row_map(text): 
    split_text = text.split() 
    # create numpy array from elements besides the first element 
    # which is the key 
    return split_text(0), np.array([float(v) for v in split_text[1:]]) 

from pyspark import SparkConf, SparkContext 
conf = (SparkConf() 
    .setMaster("local") 
    .setAppName("My app") 
    .set("spark.executor.memory", "1g")) 
sc = SparkContext(conf = conf) 

inp = sc.textFile("input.txt") 
output = inp.map(row_map).cache() 
#Below line is throwing error 
print output.reduceByKey(lambda a,b : np.add(a,b)).collect() 

более кратким и питоническим.

+0

Это решение классно и хорошо работает. Можете ли вы предложить, где бы я ошибся, если я хочу вернуть пользовательский объект класса вместо массива? Я планирую возвращать другие объекты вместе с этим массивом. – siddardha

+0

Причина, по которой вы столкнулись с проблемой, состоит в том, что класс тестирования не является сериализуемым. Посмотрите на документы python для получения дополнительной информации о том, как исправить это https://docs.python.org/2/library/pickle.html#pickling-and-unpickling-normal-class-instances . Другой вариант - создать ваши классы после того, как вы собрали выходные данные, например: data = output.reduceByKey (lambda a, b: np.add (a, b)). collect() final_output = [тестирование (значение) для значения в данных] Это будет работать только в том случае, если размер данных достаточно мал, чтобы обрабатываться непараллельно. – Anant

+0

Я опубликовал тот же вопрос в списке пользователей искры Apache, и я получил ответ о том, как использовать пользовательский класс, который я добавлю в самом вопросе для обмена знаниями. Я отвечу на ваш ответ, поскольку он решает проблему, с которой я столкнулся в некоторой степени – siddardha

 Смежные вопросы

  • Нет связанных вопросов^_^