2017-02-07 11 views
1

Предположим, у меня есть следующий случайКак проверить, если столбец массива в другой массив столбцов в pyspark dataframe

from pyspark.sql.types import * 
schema = StructType([ # schema 
    StructField("id", StringType(), True), 
    StructField("ev", ArrayType(StringType()), True), 
    StructField("ev2", ArrayType(StringType()), True),]) 
df = spark.createDataFrame([{"id": "se1", "ev": ["ev11", "ev12"], "ev2": ["ev11"]}, 
          {"id": "se2", "ev": ["ev11"], "ev2": ["ev11", "ev12"]}, 
          {"id": "se3", "ev": ["ev21"], "ev2": ["ev11", "ev12"]}, 
          {"id": "se4", "ev": ["ev21", "ev22"], "ev2": ["ev21", "ev22"]}], 
          schema=schema) 

что дает мне:

df.show() 
+---+------------+------------+ 
| id|   ev|   ev2| 
+---+------------+------------+ 
|se1|[ev11, ev12]|  [ev11]| 
|se2|  [ev11]|[ev11, ev12]| 
|se3|  [ev21]|[ev11, ev12]| 
|se4|[ev21, ev22]|[ev21, ev22]| 
+---+------------+------------+ 

Я хочу создать новый столбец булево (или выберите только истинные случаи) для строк, где содержимое столбца «ev» находится внутри столбца «ev2», возвращается:

df_target.show() 
+---+------------+------------+ 
| id|   ev|   ev2| 
+---+------------+------------+ 
|se2|  [ev11]|[ev11, ev12]| 
|se4|[ev21, ev22]|[ev21, ev22]| 
+---+------------+------------+ 

или:

df_target.show() 
+---+------------+------------+-------+ 
| id|   ev|   ev2|evInEv2| 
+---+------------+------------+-------+ 
|se1|[ev11, ev12]|  [ev11]| false| 
|se2|  [ev11]|[ev11, ev12]| true| 
|se3|  [ev21]|[ev11, ev12]| false| 
|se4|[ev21, ev22]|[ev21, ev22]| true| 
+---+------------+------------+-------+ 

Я попытался с помощью isin метод:

df.withColumn('evInEv2', df['ev'].isin(df['ev2'])).show() 
+---+------------+------------+-------+ 
| id|   ev|   ev2|evInEv2| 
+---+------------+------------+-------+ 
|se1|[ev11, ev12]|  [ev11]| false| 
|se2|  [ev11]|[ev11, ev12]| false| 
|se3|  [ev21]|[ev11, ev12]| false| 
|se4|[ev21, ev22]|[ev21, ev22]| true| 
+---+------------+------------+-------+ 

Но, похоже, это только проверяет, является ли это тот же массив.

Я также пробовал функцию array_contains от pyspark.sql.functions, но принимает только один объект, а не массив для проверки.

У меня возникли трудности даже в поиске этого из-за формулировки правильной проблемы.

Спасибо!

ответ

3

Вот опция, использующая udf, где мы проверяем длину разницы между столбцами ev и ev2. Когда длина результирующей матрицы равна 0, или все элементы ev содержатся в пределах ev2, мы возвращаем True; в противном случае False.

def contains(x,y): 
    z = len(set(x) - set(y)) 
    if z == 0: 
    return True 
    else: 
    return False 

contains_udf = udf(contains) 
df.withColumn("evInEv2", contains_udf(df.ev,df.ev2)).show() 
+---+------------+------------+-------+ 
| id|   ev|   ev2|evInEv2| 
+---+------------+------------+-------+ 
|se1|[ev11, ev12]|  [ev11]| false| 
|se2|  [ev11]|[ev11, ev12]| true| 
|se3|  [ev21]|[ev11, ev12]| false| 
|se4|[ev21, ev22]|[ev21, ev22]| true| 
+---+------------+------------+-------+ 
+0

Это сработало! Благодаря! Я даже не думал об использовании udfs – dtj