3

Я пытаюсь создать модель LDA в файле JSON.MatchError при доступе к столбцу вектора в Spark 2.0

Создание искровой контекста с файлом JSON:

import org.apache.spark.sql.SparkSession 

val sparkSession = SparkSession.builder 
    .master("local") 
    .appName("my-spark-app") 
    .config("spark.some.config.option", "config-value") 
    .getOrCreate() 

val df = spark.read.json("dbfs:/mnt/JSON6/JSON/sampleDoc.txt") 

Отображение df должен показать DataFrame

display(df) 

разметить текст

import org.apache.spark.ml.feature.RegexTokenizer 

// Set params for RegexTokenizer 
val tokenizer = new RegexTokenizer() 
       .setPattern("[\\W_]+") 
       .setMinTokenLength(4) // Filter away tokens with length < 4 
       .setInputCol("text") 
       .setOutputCol("tokens") 

// Tokenize document 
val tokenized_df = tokenizer.transform(df) 

Это должно быть дис играть в tokenized_df

display(tokenized_df) 

Получить stopwords

%sh wget http://ir.dcs.gla.ac.uk/resources/linguistic_utils/stop_words > -O /tmp/stopwords 

дополнительно: копирование стоп-слова в папку TMP

%fs cp file:/tmp/stopwords dbfs:/tmp/stopwords 

Собирая все stopwords

val stopwords = sc.textFile("/tmp/stopwords").collect() 

отфильтровывая stopwords

import org.apache.spark.ml.feature.StopWordsRemover 

// Set params for StopWordsRemover 
val remover = new StopWordsRemover() 
        .setStopWords(stopwords) // This parameter is optional 
        .setInputCol("tokens") 
        .setOutputCol("filtered") 

// Create new DF with Stopwords removed 
val filtered_df = remover.transform(tokenized_df) 

Отображение отфильтрованный df следует проверить stopwords был удален

display(filtered_df) 

Векторизация частоты встречаемости слов

import org.apache.spark.mllib.linalg.Vectors 
import org.apache.spark.sql.Row 
import org.apache.spark.ml.feature.CountVectorizer 

// Set params for CountVectorizer 
val vectorizer = new CountVectorizer() 
       .setInputCol("filtered") 
       .setOutputCol("features") 
       .fit(filtered_df) 

Проверьте vectorizer

vectorizer.transform(filtered_df) 
      .select("id", "text","features","filtered").show() 

После этого я вижу проблему при установке этого vectorizer в LDA. Вопрос, который я считаю CountVectorizer, дает разреженный вектор, но LDA требует плотного вектора. Все еще пытаюсь выяснить проблему.

Вот исключение, где карта не может преобразовать.

import org.apache.spark.mllib.linalg.Vector 
val ldaDF = countVectors.map { 
      case Row(id: String, countVector: Vector) => (id, countVector) 
      } 
display(ldaDF) 

Исключение:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4083.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4083.0 (TID 15331, 10.209.240.17): scala.MatchError: [0,(1252,[13,17,18,20,30,37,45,50,51,53,63,64,96,101,108,125,174,189,214,221,224,227,238,268,291,309,328,357,362,437,441,455,492,493,511,528,561,613,619,674,764,823,839,980,1098,1143],[1.0,1.0,2.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,3.0,1.0,2.0,1.0,5.0,1.0,2.0,2.0,1.0,4.0,1.0,2.0,3.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,1.0,1.0,1.0,1.0,2.0,1.0,2.0,1.0,1.0,1.0])] (of class org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema) 

Существует рабочий образец для LDA, который не бросает любой вопрос

import org.apache.spark.mllib.linalg.Vectors 
import org.apache.spark.sql.Row 
import org.apache.spark.mllib.linalg.Vector 
import org.apache.spark.mllib.clustering.{DistributedLDAModel, LDA} 

val a = Vectors.dense(Array(1.0,2.0,3.0)) 
val b = Vectors.dense(Array(3.0,4.0,5.0)) 
val df = Seq((1L,a),(2L,b),(2L,a)).toDF 

val ldaDF = df.map { case Row(id: Long, countVector: Vector) => (id, countVector) } 

val model = new LDA().setK(3).run(ldaDF.javaRDD) 
display(df) 

Единственное различие во втором фрагменте кода мы имеем плотную матрицу ,

ответ

13

Это не имеет никакого отношения к разреженности. Поскольку Spark 2.0.0 ML Transformers больше не генерирует o.a.s.mllib.linalg.VectorUDT, а o.a.s.ml.linalg.VectorUDT и отображаются локально в подклассы o.a.s.ml.linalg.Vector.Они несовместимы со старым API MLLib, который движется к устареванию в Spark 2.0.0.

Вы можете конвертировать между к «старой» с помощью Vectors.fromML:

import org.apache.spark.mllib.linalg.{Vectors => OldVectors} 
import org.apache.spark.ml.linalg.{Vectors => NewVectors} 

OldVectors.fromML(NewVectors.dense(1.0, 2.0, 3.0)) 
OldVectors.fromML(NewVectors.sparse(5, Seq(0 -> 1.0, 2 -> 2.0, 4 -> 3.0))) 

но больше смысла использовать ML реализацию LDA, если вы уже используете ML трансформаторов.

Для удобства вы можете использовать неявные преобразования:

import scala.languageFeature.implicitConversions 

object VectorConversions { 
    import org.apache.spark.mllib.{linalg => mllib} 
    import org.apache.spark.ml.{linalg => ml} 

    implicit def toNewVector(v: mllib.Vector) = v.asML 
    implicit def toOldVector(v: ml.Vector) = mllib.Vectors.fromML(v) 
} 
+2

также сообщения об ошибках, связанных с этим типом несоответствия очень запутанным. Например, «Исключение в потоке» main «org.apache.spark.sql.AnalysisException: не может разрешить« UDF (VecFunction) »из-за несоответствия типа данных: аргумент 1 требует векторного типа, однако« VecFunction »имеет векторный тип; ' Обратите внимание, что как аргумент, так и ожидаемый ввод называются векторным типом. –

+0

не должно быть 'org.apache.spark.mllib. * Linalg * .Vectors.fromML'? Ох и кстати это было полезно;) – javadba

-3

Решение очень просто ребята .. Ниже

//import org.apache.spark.mllib.linalg.Vector 
import org.apache.spark.ml.linalg.Vector 
0

Я изменил:

val ldaDF = countVectors.map { 
      case Row(id: String, countVector: Vector) => (id, countVector) 
      } 

к:

val ldaDF = countVectors.map { case Row(docId: String, features: MLVector) => 
           (docId.toLong, Vectors.fromML(features)) } 

И это сработало, как шарм! Он выровнен с тем, что написал @ zero323.

Список импорта:

import org.apache.spark.ml.feature.{CountVectorizer, RegexTokenizer, StopWordsRemover} 
import org.apache.spark.ml.linalg.{Vector => MLVector} 
import org.apache.spark.mllib.clustering.{LDA, OnlineLDAOptimizer} 
import org.apache.spark.mllib.linalg.Vectors 
import org.apache.spark.sql.{Row, SparkSession}