2017-01-05 9 views
2

Я изучаю Spark Streaming через PySpark и получаю ошибку при попытке использовать функцию transform с take.Преобразованный DStream в pyspark дает ошибку при вызове pprint

Я могу с успехом использовать sortBy против DStream через transform и pprint результат.

author_counts_sorted_dstream = author_counts_dstream.transform\ 
    (lambda foo:foo\ 
    .sortBy(lambda x:x[0].lower())\ 
    .sortBy(lambda x:x[1],ascending=False)) 
author_counts_sorted_dstream.pprint() 

Но если я использую take следуя той же схеме и попытаться pprint его:

top_five = author_counts_sorted_dstream.transform\ 
    (lambda rdd:rdd.take(5)) 
top_five.pprint() 

работа завершается с

Py4JJavaError: An error occurred while calling o25.awaitTermination. 
: org.apache.spark.SparkException: An exception was raised by Python: 
Traceback (most recent call last): 
    File "/usr/local/spark/python/pyspark/streaming/util.py", line 67, in call 
    return r._jrdd 
AttributeError: 'list' object has no attribute '_jrdd' 

Вы можете увидеть полный код и вывод в the notebook here.

Что я делаю неправильно?

ответ

2

Функция, которую вы переходите на transform, должна быть преобразована с RDD в RDD. Если вы используете действие, как take, вы должны преобразовать результат обратно в RDD:

sc: SparkContext = ... 

author_counts_sorted_dstream.transform(
    lambda rdd: sc.parallelize(rdd.take(5)) 
) 

В отличие RDD.sortBy используется трансформация (возвращает RDD), поэтому нет необходимости в дальнейшем распараллеливания.

На стороне записки следующие функции:

lambda foo: foo \ 
    .sortBy(lambda x:x[0].lower()) \ 
    .sortBy(lambda x:x[1], ascending=False) 

не имеет особого смысла. Помните, что Spark sort by shuffle поэтому нестабилен. Если вы хотите отсортировать по нескольким полям, вы должны использовать составной ключ:

lambda x: (x[0].lower(), -x[1])