2016-12-06 4 views
2

У меня есть данные, которые поступают из Kafka через DStream. Я хочу выполнить извлечение функции, чтобы получить некоторые ключевые слова.Как использовать извлечение функции с помощью DStream в Apache Spark

Я не хочу ждать прибытия всех данных (поскольку он предназначен для непрерывного потока, который потенциально никогда не заканчивается), поэтому я надеюсь выполнить извлечение в кусках - мне не важно, если точность будет немного страдать.

До сих пор я собрал что-то вроде этого:

def extractKeywords(stream: DStream[Data]): Unit = { 

    val spark: SparkSession = SparkSession.builder.getOrCreate 

    val streamWithWords: DStream[(Data, Seq[String])] = stream map extractWordsFromData 

    val streamWithFeatures: DStream[(Data, Array[String])] = streamWithWords transform extractFeatures(spark) _ 

    val streamWithKeywords: DStream[DataWithKeywords] = streamWithFeatures map addKeywordsToData 

    streamWithFeatures.print() 
} 

def extractFeatures(spark: SparkSession) 
        (rdd: RDD[(Data, Seq[String])]): RDD[(Data, Array[String])] = { 

    val df = spark.createDataFrame(rdd).toDF("data", "words") 

    val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(numOfFeatures) 
    val rawFeatures = hashingTF.transform(df) 

    val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features") 
    val idfModel = idf.fit(rawFeatures) 

    val rescaledData = idfModel.transform(rawFeature) 

    import spark.implicits._ 
    rescaledData.select("data", "features").as[(Data, Array[String])].rdd 
} 

Однако я получил java.lang.IllegalStateException: Haven't seen any document yet. - Я не удивлен, как я только попробовать на металлолом все вместе, и я понимаю, что с тех пор я не жду при приеме некоторых данных генерируемая модель может быть пуста, когда я пытаюсь использовать ее для данных.

Каким будет правильный подход для решения этой проблемы?

ответ

0

Я использовал советы от комментариев и разделить процедуру на 2 пробегов:

  • один рассчитанной IDF модель и сохраняет его в файл

    def trainFeatures(idfModelFile: File, rdd: RDD[(String, Seq[String])]) = { 
        val session: SparkSession = SparkSession.builder.getOrCreate 
    
        val wordsDf = session.createDataFrame(rdd).toDF("data", "words") 
    
        val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures") 
        val featurizedDf = hashingTF.transform(wordsDf) 
    
        val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features") 
        val idfModel = idf.fit(featurizedDf) 
    
        idfModel.write.save(idfModelFile.getAbsolutePath) 
    } 
    
  • тот, который читает IDF модель из файла и просто запускает его на всю входящую информацию

    val idfModel = IDFModel.load(idfModelFile.getAbsolutePath) 
    
    val documentDf = spark.createDataFrame(rdd).toDF("update", "document") 
    
    val tokenizer = new Tokenizer().setInputCol("document").setOutputCol("words") 
    val wordsDf = tokenizer.transform(documentDf) 
    
    val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures") 
    val featurizedDf = hashingTF.transform(wordsDf) 
    
    val extractor = idfModel.setInputCol("rawFeatures").setOutputCol("features") 
    val featuresDf = extractor.transform(featurizedDf) 
    
    featuresDf.select("update", "features") 
    

 Смежные вопросы

  • Нет связанных вопросов^_^