2017-02-21 25 views
2

У меня есть dataframe с паркетным файлом, и мне нужно добавить новый столбец с некоторыми случайными данными, но мне нужны эти случайные данные друг для друга. Это мой фактический код и текущая версия искры 1.5.1-CDH-5.5.2:О том, как добавить новый столбец в существующий DataFrame со случайными значениями в Scala

val mydf = sqlContext.read.parquet("some.parquet") 
// mydf.count() 
// 63385686 
mydf.cache 

val r = scala.util.Random 
import org.apache.spark.sql.functions.udf 
def myNextPositiveNumber :String = { (r.nextInt(Integer.MAX_VALUE) + 1).toString.concat("D")} 
val myFunction = udf(myNextPositiveNumber _) 
val myNewDF = mydf.withColumn("myNewColumn",lit(myNextPositiveNumber)) 

с этим кодом, у меня есть эти данные:

scala> myNewDF.select("myNewColumn").show(10,false) 
+-----------+ 
|myNewColumn| 
+-----------+ 
|889488717D | 
|889488717D | 
|889488717D | 
|889488717D | 
|889488717D | 
|889488717D | 
|889488717D | 
|889488717D | 
|889488717D | 
|889488717D | 
+-----------+ 

Похоже, что udf myNextPositiveNumber вызывается только один раз, не так ли?

обновление подтвердится, есть только один отчетливое значение:

scala> myNewDF.select("myNewColumn").distinct.show(50,false) 
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl 
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl 
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl 
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl 
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl 
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl 
17/02/21 13:23:11 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl 
... 

+-----------+                 
|myNewColumn| 
+-----------+ 
|889488717D | 
+-----------+ 

что я делаю неправильно?

Update 2: наконец-то, с помощью @ user6910411 У меня есть этот код:

val mydf = sqlContext.read.parquet("some.parquet") 
// mydf.count() 
// 63385686 
mydf.cache 

val r = scala.util.Random 

import org.apache.spark.sql.functions.udf 

val accum = sc.accumulator(1) 

def myNextPositiveNumber():String = { 
    accum+=1 
    accum.value.toString.concat("D") 
} 

val myFunction = udf(myNextPositiveNumber _) 

val myNewDF = mydf.withColumn("myNewColumn",lit(myNextPositiveNumber)) 

myNewDF.select("myNewColumn").count 

// 63385686 

обновления 3

Фактический код генерирует данные, как это:

scala> mydf.select("myNewColumn").show(5,false) 
17/02/22 11:01:57 WARN ParquetRecordReader: Can not initialize counter due to context is not a instance of TaskInputOutputContext, but is org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl 
+-----------+ 
|myNewColumn| 
+-----------+ 
|2D   | 
|2D   | 
|2D   | 
|2D   | 
|2D   | 
+-----------+ 
only showing top 5 rows 

Похоже функция udf вызывается только один раз, не так ли? Мне нужен новый случайный элемент в этом столбце.

обновление 4 @ user6910411

у меня есть этот фактический код, который увеличивает идентификатор, но не конкатенации конечного символа, это странно. Это мой код:

import org.apache.spark.sql.functions.udf 


val mydf = sqlContext.read.parquet("some.parquet") 

mydf.cache 

def myNextPositiveNumber():String = monotonically_increasing_id().toString().concat("D") 

val myFunction = udf(myNextPositiveNumber _) 

val myNewDF = mydf.withColumn("myNewColumn",expr(myNextPositiveNumber)) 

scala> myNewDF.select("myNewColumn").show(5,false) 
17/02/22 12:00:02 WARN Executor: 1 block locks were not released by TID = 1: 
[rdd_4_0] 
+-----------+ 
|myNewColumn| 
+-----------+ 
|0   | 
|1   | 
|2   | 
|3   | 
|4   | 
+-----------+ 

мне нужно что-то вроде:

+-----------+ 
|myNewColumn| 
+-----------+ 
|1D   | 
|2D   | 
|3D   | 
|4D   | 
+-----------+ 

ответ

8

Спарка> = 2,3

можно отключить некоторые оптимизации с использованием asNondeterministic метода:

import org.apache.spark.sql.expressions.UserDefinedFunction 

val f: UserDefinedFunction = ??? 
val fNonDeterministic: UserDefinedFunction = f.asNondeterministic 

Пожалуйста, убедитесь, что вы понимаете гуару перед использованием этой опции.

Спарка < 2,3

Функция, которая передается UDF должна быть детерминированной (с возможным исключением SPARK-20586) и нульарными функциями вызовов могут быть заменены константами. Если вы хотите, чтобы генерировать случайные числа использовать на встроенных функций:

  • rand - Создание случайного столбца с независимыми и одинаково распределенными (i.i.d.) образцов из U [0,0, 1,0].
  • randn - Создайте столбец с независимыми и идентично распределенными (i.i.d.) образцами из стандартного нормального распределения.

и преобразовывать выходной сигнал, чтобы получить требуемое распределение, например:

(rand * Integer.MAX_VALUE).cast("bigint").cast("string")