2016-08-06 4 views
-1

Я вижу ошибку травильного:PySpark PicklingError

Could not pickle object as excessively deep recursion required.

Ниже приведен след назад:

Traceback (most recent call last): 
    File "/usr/hdp/current/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", line 62, in call 
    r = self.func(t, *rdds) 
    File "/usr/hdp/current/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 159, in 
    func = lambda t, rdd: old_func(rdd) 
    if rdd.count() > 0: 
    File "/usr/hdp/current/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1006, in count 
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum() 
    File "/usr/hdp/current/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 997, in sum 
    return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add) 
    File "/usr/hdp/current/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 871, in fold 
    vals = self.mapPartitions(func).collect() 
    File "/usr/hdp/current/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 773, in collect 
    port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd()) 
    File "/usr/hdp/current/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2388, in _jrdd 
    pickled_cmd, bvars, env, includes = _prepare_for_python_RDD(self.ctx, command, self) 
    File "/usr/hdp/current/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2308, in _prepare_for_python_RDD 
    pickled_command = ser.dumps(command) 
    File "/usr/hdp/current/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 428, in dumps 
    return cloudpickle.dumps(obj, 2) 
    File "/usr/hdp/current/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 646, in dumps 
    cp.dump(obj) 
    File "/usr/hdp/current/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 111, in dump 
    raise pickle.PicklingError(msg) 
    PicklingError: Could not pickle object as excessively deep recursion required. 

Вот часть моего кода трактир HighLevel который вызвал ошибку:

sc = SparkContext(appName="my_app") 

ssc = StreamingContext(sc, 1) 

kafka_stream = KafkaUtils.createDirectStream(ssc, full_topic_list, kafka_params, fromOffsets=offset_dict) 

messages = kafka_stream.map(lambda (k, v): json.loads(v)) 

messages.foreachRDD(lambda rdd: process(rdd, topic_list, sqlcontext)) 

В моей функции процесса есть счетчик rdd: if topic_rdd.count() > 0, который выдает ошибку.

ответ

0

Вы не можете передавать RDD и обрабатывать RDD в распределенных функциях (карты, сокращения и т. Д.).

+0

Спасибо, cftarnas. Каков наилучший способ выполнения функции для rdd, но если я передаю RDD. – ling

+0

@ling: Вы действительно не можете передавать RDD, они не разборчивы. Как решить вашу общую проблему зависит от того, что вы пытаетесь сделать, может помочь более подробный фрагмент кода. Это может быть так же просто, как предварительно вычислить topic_rdd.count(), а затем просто передать сами подсчеты. – cftarnas