-1
Я вижу ошибку травильного:PySpark PicklingError
Could not pickle object as excessively deep recursion required.
Ниже приведен след назад:
Traceback (most recent call last):
File "/usr/hdp/current/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", line 62, in call
r = self.func(t, *rdds)
File "/usr/hdp/current/spark/python/lib/pyspark.zip/pyspark/streaming/dstream.py", line 159, in
func = lambda t, rdd: old_func(rdd)
if rdd.count() > 0:
File "/usr/hdp/current/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1006, in count
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
File "/usr/hdp/current/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 997, in sum
return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
File "/usr/hdp/current/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 871, in fold
vals = self.mapPartitions(func).collect()
File "/usr/hdp/current/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 773, in collect
port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/usr/hdp/current/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2388, in _jrdd
pickled_cmd, bvars, env, includes = _prepare_for_python_RDD(self.ctx, command, self)
File "/usr/hdp/current/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2308, in _prepare_for_python_RDD
pickled_command = ser.dumps(command)
File "/usr/hdp/current/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 428, in dumps
return cloudpickle.dumps(obj, 2)
File "/usr/hdp/current/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 646, in dumps
cp.dump(obj)
File "/usr/hdp/current/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 111, in dump
raise pickle.PicklingError(msg)
PicklingError: Could not pickle object as excessively deep recursion required.
Вот часть моего кода трактир HighLevel который вызвал ошибку:
sc = SparkContext(appName="my_app")
ssc = StreamingContext(sc, 1)
kafka_stream = KafkaUtils.createDirectStream(ssc, full_topic_list, kafka_params, fromOffsets=offset_dict)
messages = kafka_stream.map(lambda (k, v): json.loads(v))
messages.foreachRDD(lambda rdd: process(rdd, topic_list, sqlcontext))
В моей функции процесса есть счетчик rdd: if topic_rdd.count() > 0
, который выдает ошибку.
Спасибо, cftarnas. Каков наилучший способ выполнения функции для rdd, но если я передаю RDD. – ling
@ling: Вы действительно не можете передавать RDD, они не разборчивы. Как решить вашу общую проблему зависит от того, что вы пытаетесь сделать, может помочь более подробный фрагмент кода. Это может быть так же просто, как предварительно вычислить topic_rdd.count(), а затем просто передать сами подсчеты. – cftarnas