0

Я использую PySpark в Jupyter на Azure. Я пытаюсь проверить использование UDF на фреймворке данных, однако UDF не выполняется.PySpark: UDF не работает на dataframe

Мой dataframe создан:

users = sqlContext.sql("SELECT DISTINCT userid FROM FoodDiaryData")

Я подтвердил это dataframe заполняемый 100 строк. В следующей ячейке я пытаюсь выполнить простой udf.

def iterateMeals(user): 
    print user 

users.foreach(iterateMeals) 

Это не производит выход. Я бы ожидал, что каждая запись в кадре данных будет напечатана. Однако, если я просто попробую iterateMeals('test'), он сгорит и распечатает 'test'. Я также попытался с помощью pyspark.sql.functions

from pyspark.sql.functions import udf 

def iterateMeals(user): 
    print user 
f_iterateMeals = udf(iterateMeals,LongType()) 

users.foreach(f_iterateMeals) 

Когда я пытаюсь это, я получаю следующее сообщение об ошибке:

Py4JError: Произошла ошибка при вызове o461. getnewargs. Трассировка: py4j.Py4JException: Метод getnewargs ([]) не существует

Может кто-то объяснить, где я пошло не так? Мне нужно будет выполнить udfs внутри .foreach данных для этого приложения.

ответ

1
  1. Вы не увидите выход, потому что print выполняется на рабочих узлах и переходит к соответствующему выходу. См. Why does foreach not bring anything to the driver program? для получения полного объяснения.

  2. foreach действует на RDD не DataFrame. UDFs недействительны в этом контексте.