У меня есть данные, которые поступают из Kafka через DStream. Я хочу выполнить извлечение функции, чтобы получить некоторые ключевые слова.Как использовать извлечение функции с помощью DStream в Apache Spark
Я не хочу ждать прибытия всех данных (поскольку он предназначен для непрерывного потока, который потенциально никогда не заканчивается), поэтому я надеюсь выполнить извлечение в кусках - мне не важно, если точность будет немного страдать.
До сих пор я собрал что-то вроде этого:
def extractKeywords(stream: DStream[Data]): Unit = {
val spark: SparkSession = SparkSession.builder.getOrCreate
val streamWithWords: DStream[(Data, Seq[String])] = stream map extractWordsFromData
val streamWithFeatures: DStream[(Data, Array[String])] = streamWithWords transform extractFeatures(spark) _
val streamWithKeywords: DStream[DataWithKeywords] = streamWithFeatures map addKeywordsToData
streamWithFeatures.print()
}
def extractFeatures(spark: SparkSession)
(rdd: RDD[(Data, Seq[String])]): RDD[(Data, Array[String])] = {
val df = spark.createDataFrame(rdd).toDF("data", "words")
val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(numOfFeatures)
val rawFeatures = hashingTF.transform(df)
val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
val idfModel = idf.fit(rawFeatures)
val rescaledData = idfModel.transform(rawFeature)
import spark.implicits._
rescaledData.select("data", "features").as[(Data, Array[String])].rdd
}
Однако я получил java.lang.IllegalStateException: Haven't seen any document yet.
- Я не удивлен, как я только попробовать на металлолом все вместе, и я понимаю, что с тех пор я не жду при приеме некоторых данных генерируемая модель может быть пуста, когда я пытаюсь использовать ее для данных.
Каким будет правильный подход для решения этой проблемы?