38

Рассматривая новую искровую фреймворку api, неясно, можно ли изменять столбцы данных.Обновление столбца dataframe в искровом излучении

Как мне изменить значение в строке x столбец y данных?

В pandas это будет df.ix[x,y] = new_value

Edit: Закрепление, что было сказано ниже, вы не можете изменить существующий dataframe как он неизменен, но вы можете вернуть новый dataframe с требуемыми изменениями.

Если вы просто хотите, чтобы заменить значение в колонке на основе условия, как np.where:

from pyspark.sql import functions as F 

update_func = (F.when(F.col('update_col') == replace_val, new_value) 
       .otherwise(F.col('update_col'))) 
df = df.withColumn('new_column_name', update_func) 

Если вы хотите, чтобы выполнить какую-то операцию на колонке и создать новый столбец, который добавляется к dataframe:

import pyspark.sql.functions as F 
import pyspark.sql.types as T 

def my_func(col): 
    do stuff to column here 
    return transformed_value 

# if we assume that my_func returns a string 
my_udf = F.UserDefinedFunction(my_func, T.StringType()) 

df = df.withColumn('new_column_name', my_udf('update_col')) 

Если вы хотите новый столбец иметь такое же имя, как и старый столбец, можно добавить дополнительный шаг:

df = df.drop('update_col').withColumnRenamed('new_column_name', 'update_col') 
+0

Если вы хотите получить доступ к DataFrame по индексу, вам нужно сначала создать индекс. См., Например, http://stackoverflow.com/questions/26828815/how-to-get-elemnt-by-index-in-spark-rdd-java. Или добавьте индексный столбец со своим собственным индексом. – fanfabbb

ответ

44

Пока вы не можете изменить столбец как таковой, вы можете работать с столбцом и возвращать новый DataFrame, отражающий это изменение. Для этого вы должны сначала создать UserDefinedFunction, применяя применяемую операцию, а затем выборочно применять эту функцию только к целевому столбцу. В Python:

from pyspark.sql.functions import UserDefinedFunction 
from pyspark.sql.types import StringType 

name = 'target_column' 
udf = UserDefinedFunction(lambda x: 'new_value', StringType()) 
new_df = old_df.select(*[udf(column).alias(name) if column == name else column for column in old_df.columns]) 

new_df теперь имеет такую ​​же схему, как old_df (при условии, что old_df.target_column был типа StringType, а), но все значения в столбце target_column будет new_value.

+1

Это реальный ответ на проблему, спасибо! тем не менее, искровые задания не заканчиваются для меня, все исполнители получают лос. можете ли вы подумать об альтернативном пути? Я использую его с немного более сложным UDF, где я делаю преобразование в строки. Нет такого синтаксиса, подобного pandas, как new_df = old_df.col1.apply (lambda x: func (x))? – fanfabbb

+12

есть также: 'new_df = old_df.withColumn ('target_column', udf (df.name))' – fanfabbb

+0

@fanfabbb Не могли бы вы разместить свой UDF? Я не понимаю, почему сложная операция строки приведет к потерям исполнителей. Возможно, уменьшение размеров разделов может помочь, например. путем увеличения количества разделов. – karlson

12

DataFrames основаны на RDD. RDD являются неизменяемыми структурами и не позволяют обновлять элементы на месте. Чтобы изменить значения, вам нужно будет создать новый DataFrame, изменив исходный либо с помощью SQL-подобных операций DSL или RDD, таких как map.

Настоятельно рекомендуется использовать слайд-палубу: Introducing DataFrames in Spark for Large Scale Data Science.

+3

Что такое абстракция dataframe, которая не может быть выполнена в том же количестве строк с таблицей? – Luke

+0

«DataFrames внедряет новые упрощенные операторы для фильтрации, агрегации и проецирования по большим наборам данных. Внутри DataFrames используют логический оптимизатор Spark SQL для разумного планирования физического выполнения операций, чтобы хорошо работать на больших наборах данных» - https://databricks.com /blog/2015/03/13/announcing-spark-1-3.html – maasg

11

Как и в случае с maasg, вы можете создать новый DataFrame из результата карты, примененной к старым DataFrame. Пример данного DataFrame df с двумя рядами:

val newDf = sqlContext.createDataFrame(df.map(row => 
    Row(row.getInt(0) + SOMETHING, applySomeDef(row.getAs[Double]("y")), df.schema) 

Обратите внимание, что если типы столбцов меняются, вы должны дать ему правильную схему вместо df.schema. Проверьте апи из org.apache.spark.sql.Row доступных методов: https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Row.html

[Update] или с помощью UDFs в Scala:

import org.apache.spark.sql.functions._ 

val toLong = udf[Long, String] (_.toLong) 

val modifiedDf = df.withColumn("modifiedColumnName", toLong(df("columnName"))).drop("columnName") 

и если имя столбца должна оставаться такой же, вы можете переименовать его обратно:

modifiedDf.withColumnRenamed("modifiedColumnName", "columnName") 
23

Обычно при обновлении столбца мы хотим сопоставить старое значение с новым значением.Вот как это сделать в pyspark без UDF:

# update df[update_col], mapping old_value --> new_value 
from pyspark.sql import functions as F 
df = df.withColumn(update_col, 
    F.when(df[update_col]==old_value,new_value). 
    otherwise(df[update_col])). 
+0

Как использовать это, когда мой update_col - это список Ex- =: 'update_cols = ['col1', 'col2', 'col3']'? – GeekFactory

+0

Используйте цикл for. – Paul