2015-08-07 1 views
4

У меня есть набор данных, на основе которых я хочу создать классификационную модель. Каждый ряд имеет следующий вид:Как кодировать категориальные функции в Apache Spark

user1,class1,product1 
user1,class1,product2 
user1,class1,product5 
user2,class1,product2 
user2,class1,product5 
user3,class2,product1 

Есть около 1M пользователей, 2 класса и 1M продукции. То, что я хотел бы сделать дальше, - создать разреженные векторы (что уже поддерживается MLlib). НО для применения этой функции я должен сначала создать плотные векторы (с 0s). Другими словами, мне приходится бинарировать мои данные. Какой самый простой (или самый элегантный) способ сделать это?

Учитывая, что я новичок в отношении MLlib, могу ли я попросить вас предоставить конкретный пример? Я использую MLlib 1.2.

EDIT

Я закончил с следующим фрагментом кода, но оказываюсь очень медленно ... Любые другие идеи, при условии, что я могу использовать только MLlib 1.2?

val data = test11.map(x=> ((x(0) , x(1)) , x(2))).groupByKey().map(x=> (x._1 , x._2.toArray)).map{x=> 
    var lt : Array[Double] = new Array[Double](test12.size) 
    val id = x._1._1 
    val cl = x._1._2 
    val dt = x._2 
    var i = -1 
    test12.foreach{y => i += 1; lt(i) = if(dt contains y) 1.0 else 0.0} 
    val vs = Vectors.dense(lt) 
    (id , cl , vs) 
} 
+0

Может дать вам пример того, что вы хотите, чтобы плотный векторный вывод выглядел как этот вход? – mattinbits

+0

какую классификацию вы хотите сделать точно? т. е. если 'userX' и' classY', то, скорее всего, это будет 'productZ' или что-то еще? –

+0

нет действительно. Я буду использовать двоичную классификацию, где 'userX' - разреженный вектор значений, а' classY' - соответствующий класс. – user706838

ответ

8

Вы можете использовать spark.ml's OneHotEncoder.

Вы первый использование:

OneHotEncoder.categories(rdd, categoricalFields) 

Где categoricalField является последовательность индексов, при котором ваш RDD содержит категориальные данные. categories, учитывая набор данных и индекс столбцов, которые являются категориальными переменными, возвращает структуру, которая для каждого поля описывает значения, которые присутствуют в наборе данных. Эта карта предназначена для использования в качестве входных данных для метода кодирования:

OneHotEncoder.encode(rdd, categories) 

который возвращает ваш Векторизованных RDD[Array[T]].

+0

, который не доступен в MLlib 1.2 :-) –

+0

Да, не так ли, и я не могу обновить, к сожалению ... Пожалуйста, взгляните на мое редактирование. – user706838

+0

Это даже не кажется доступным в 1.4. –

4

Если вы используете встроенный OneHotEncoder, это не вариант, и у вас есть только одна переменная, реализующая одноразовый бедный человек, более или менее простой. Первая позволяет создать пример данных:

import org.apache.spark.mllib.linalg.{Vector, Vectors} 

val rdd = sc.parallelize(List(
    Array("user1", "class1", "product1"), 
    Array("user1", "class1", "product2"), 
    Array("user1", "class1", "product5"), 
    Array("user2", "class1", "product2"), 
    Array("user2", "class1", "product5"), 
    Array("user3", "class2", "product1"))) 

Далее мы должны создать отображение из значения в индексе:

val prodMap = sc.broadcast(rdd.map(_(2)).distinct.zipWithIndex.collectAsMap) 

и простой функции кодирования:

def encodeProducts(products: Iterable[String]): Vector = { 
    Vectors.sparse(
     prodMap.value.size, 
     products.map(product => (prodMap.value(product).toInt, 1.0)).toSeq 
    ) 
} 

Наконец мы может применяться к набору данных:

rdd.map(x => ((x(0), x(1)), x(2))).groupByKey.mapValues(encodeProducts) 

Относительно легко вывести выше, чтобы обрабатывать несколько переменных.

Edit:

Если количество продуктов является большим, чтобы сделать вещание полезным это должно быть возможно использовать join вместо этого.Во-первых, мы можем создать подобное отображение продукта индексировать, но сохранить его как РДУ:

import org.apache.spark.HashPartitioner 

val nPartitions = ??? 

val prodMapRDD = rdd 
    .map(_(2)) 
    .distinct 
    .zipWithIndex 
    .partitionBy(new HashPartitioner(nPartitions)) 
    .cache 

val nProducts = prodMapRDD.count // Should be < Int.MaxValue 

Далее мы перекроить ввода RDD получить PairRDD индексируются продукта:

val pairs = rdd 
    .map(rec => (rec(2), (rec(0), rec(1)))) 
    .partitionBy(new HashPartitioner(nPartitions)) 

Наконец мы можем join как

def indicesToVec(n: Int)(indices: Iterable[Long]): Vector = { 
    Vectors.sparse(n, indices.map(x => (x.toInt, 1.0)).toSeq) 
} 

pairs.join(prodMapRDD) 
    .values 
    .groupByKey 
    .mapValues(indicesToVec(nProducts.toInt)) 
+1

+1 для общего решения. У вас есть другое решение, которое не использует 'broadcast'? Я использую такое решение, как ваше, но иногда это не работает, потому что 'prodMap' слишком велик для трансляции. – emeth

+1

@emeth Это намного дороже, но должно быть возможно, что вы используете соединения. Подробнее см. В разделе «Редактирование». – zero323

0

Оригинальный вопрос задает самый простой способ указать категориальные функции из некатегорических.

В Spark ML вы можете использовать метод setMaxCategories VectorIndexer, где вам не нужно указывать поля - вместо этого он будет понимать как категорически те поля с меньшей или равной мощностью, которые имеют заданный номер (в данном случае 2) ,

val indexer = new VectorIndexer() 
.setInputCol("features") 
.setOutputCol("indexed") 
.setMaxCategories(10) 

Подробнее см. this reply.