1

У меня есть два больших dataframes [a], у которых есть все события, идентифицированные id [b] списком идентификаторов. Я хочу фильтровать [a] на основе идентификаторов в [b], используя реализацию stat.bloomFilter в искровом режиме 2.0.0Использование stat.bloomFilter в Spark 2.0.0 для фильтрации другого фрейма данных

Однако я не вижу никаких операций в API набора данных, чтобы присоединиться к фильтру цветения к данным frame [a]

val in1 = spark.sparkContext.parallelize(List(0, 1, 2, 3, 4, 5)) 
val df1 = in1.map(x => (x, x+1, x+2)).toDF("c1", "c2", "c3") 

val in2 = spark.sparkContext.parallelize(List(0, 1, 2)) 
val df2 = in2.map(x => (x)).toDF("c1") 

val expectedNumItems: Long = 1000 
val fpp: Double = 0.005 

val sbf = df.stat.bloomFilter($"c1", expectedNumItems, fpp) 
val sbf2 = df2.stat.bloomFilter($"c1", expectedNumItems, fpp) 

Каков наилучший способ фильтрации 'df1' на основе значений в df2?

Спасибо!

ответ

2

Вы можете использовать UDF:

def might_contain(f: org.apache.spark.util.sketch.BloomFilter) = udf((x: Int) => 
    if(x != null) f.mightContain(x) else false) 

df1.where(might_contain(sbf2)($"C1")) 
0

Я думаю, что нашел правильный способ сделать это, но все равно хотел бы указать указатели, чтобы увидеть, есть ли лучшие способы управления этим.

Вот мое решение -

val in1 = spark.sparkContext.parallelize(List(0, 1, 2, 3, 4, 5)) 
val d1 = in1.map(x => (x, x+1, x+2)).toDF("c1", "c2", "c3") 

val in2 = spark.sparkContext.parallelize(List(0, 1, 2)) 
val d2 = in2.map(x => (x)).toDF("c1") 

val s2 = d2.stat.bloomFilter($"c1", expectedNumItems, fpp) 

val a = spark.sparkContext.broadcast(s2) 

val x = d1.rdd.filter(x => a.value.mightContain(x(0))) 

case class newType(c1: Int, c2: Int, c3: Int) extends Serializable 

val xDF = x.map(y => newType(y(0).toString.toInt, y(1).toString.toInt, y(2).toString.toInt)).toDF() 

scala> d1.show(10) 
+---+---+---+ 
| c1| c2| c3| 
+---+---+---+ 
| 0| 1| 2| 
| 1| 2| 3| 
| 2| 3| 4| 
| 3| 4| 5| 
| 4| 5| 6| 
| 5| 6| 7| 
+---+---+---+ 

scala> d2.show(10) 
+---+ 
| c1| 
+---+ 
| 0| 
| 1| 
| 2| 
+---+ 

scala> xDF.show(10) 
+---+---+---+ 
| c1| c2| c3| 
+---+---+---+ 
| 0| 1| 2| 
| 1| 2| 3| 
| 2| 3| 4| 
+---+---+---+ 
0

Я построил неявный класс, который оборачивает https://stackoverflow.com/a/41989703/6723616 Комментарии приветствуются!

/** 
    * Copyright 2017 Yahoo, Inc. 
    * Zlib license: https://www.zlib.net/zlib_license.html 
    */ 

package me.klotz.spark.utils 

import org.apache.spark.sql.functions._ 
import org.apache.spark.sql.Dataset 
import org.apache.spark.sql.Row 
import org.apache.spark.util.sketch.BloomFilter 
import org.apache.spark.SparkContext 

object BloomFilterEnhancedJoin { 

    // not parameterized for field typel; assumes string 
    /** 
    * Like .join(bigDF, smallDF, but accelerated with a Bloom filter. 
    * You pass in a size estimate of the bigDF, and a ratio of acceptable false positives out of the expected result set size. 
    * ratio=1 is a good start; that will result in about 50% false positives in the big-small join, so the filter accepts 
    * about as many as it passes, rather than rejecting almost all. Pass in a size estimate of the big dataframe 
    * to avoid enumerating it. The small DataFrame gets enumerated anyway. 
    * 
    * Example use: 
    * <code> 
    * import me.klotz.spark.utils.BloomFilterEnhancedJoin._ 
    * val (dups_joined, bloomFilterBroadcast) = df_big.joinBloom(1024L*1024L*1024L, dups, 10.0, "id") 
    * dups_joined.write.format("orc").save("dups") 
    * bloomFilterBroadcast.unpersist 
    * <code> 
    */ 
    implicit class BloomFilterEnhancedJoiner(bigdf:Dataset[Row]) { 
    /** 
     * You should call bloomFilterBroadcast.unpersist after 
     */ 
    def joinBloom(bigDFCountEstimate:Long, smallDF: Dataset[Row], ratio:Double, field:String) = { 
     val sc = smallDF.sparkSession.sparkContext 
     val smallDFCount = smallDF.count 
     val fpr = smallDFCount.toDouble/bigDFCountEstimate.toDouble/ratio 
     println(s"fpr=${fpr} = smallDFCount=${smallDFCount}/bigDFCountEstimate=${bigDFCountEstimate}/ratio=${ratio}") 

     val bloomFilterBroadcast = sc.broadcast((smallDF.stat.bloomFilter(field, smallDFCount, fpr))) 
     val mightContain = udf((x: String) => if (x != null) bloomFilterBroadcast.value.mightContainString(x) else false) 

     (bigdf.filter(mightContain(col(field))).join(smallDF, field), bloomFilterBroadcast) 
    } 
    } 

}