2016-12-02 8 views
1

У меня есть неприятная проблема с использованием ноутбука jupyter с искровым.Как распределить классы с PySpark и Jupyter

мне нужно определить пользовательский класс внутри ноутбука и использовать его для выполнения некоторых операций на карте

from pyspark import SparkContext 
from pyspark import SparkConf 
from pyspark import SQLContext 

conf = SparkConf().setMaster("spark://192.168.10.11:7077")\ 
       .setAppName("app_jupyter/")\ 
       .set("spark.cores.max", "10") 

sc = SparkContext(conf=conf) 

data = [1, 2, 3, 4, 5] 
distData = sc.parallelize(data) 

class demo(object): 
    def __init__(self, value): 
     self.test = value + 10 
     pass 

distData.map(lambda x : demo(x)).collect() 

Это дает следующее сообщение об ошибке:

PicklingError: Can't pickle : attribute lookup main.demo failed

Я знаю, что эта ошибка о, но я could't выяснить решение ..

Я попытался:

  1. Определите файл python demo.py за пределами ноутбука. Это работает, но это такое уродливое решение ...
  2. Создать динамический модуль like this, а затем импортировать его потом ... Это дает ту же ошибку,

Что бы решение? ... Я хочу, чтобы все работало в том же ноутбуке

можно изменить что-то в:

  1. путь искры работает, возможно, некоторые конфигурации рассол
  2. Что-то в коде ... Используйте некоторый подход к статическому магии

ответ

1

Нет надежного и изящного обходного пути здесь, и это поведение не имеет особого отношения к Spark. This is all about fundamental design of the Python pickle

pickle can save and restore class instances transparently, however the class definition must be importable and live in the same module as when the object was stored.

Теоретически вы могли бы определить custom cell magic, которые:

  • Написать содержимое ячейки к модулю.
  • Импортируйте его.
  • Позвоните SparkContext.addPyFile, чтобы распространять модуль.
from IPython.core.magic import register_cell_magic 
import importlib 

@register_cell_magic 
def spark_class(line, cell): 
    module = line.strip() 
    f = "{0}.py".format(module) 

    with open(f, "w") as fw: 
     fw.write(cell) 

    globals()[module] = importlib.import_module(module) 
    sc.addPyFile(f) 
In [2]: %%spark_class foo 
    ...: class Foo(object): 
    ...:  def __init__(self, x): 
    ...:   self.x = x 
    ...:  def __repr__(self): 
    ...:   return "Foo({0})".format(self.x) 
    ...: 

In [3]: sc.parallelize([1, 2, 3]).map(lambda x: foo.Foo(x)).collect() 
Out[3]: [Foo(1), Foo(2), Foo(3)]  

но это один раз дело а. Как только файл помечен для распространения, его нельзя изменить или перераспределить. Кроме того, существует проблема перезагрузки импорта на удаленных хостах. Я могу придумать несколько более сложных схем, но это просто больше проблем, чем того стоит.

 Смежные вопросы

  • Нет связанных вопросов^_^