2016-07-13 6 views
13

Мне нужно удалить акценты с символов на испанском и других языках из разных наборов данных.Каков наилучший способ удаления акцентов с фреймами данных apache в PySpark?

Я уже сделал функцию, основанную на коде, представленном в этом post, который удаляет специальные акценты. Проблема в том, что функция медленная, потому что она использует UDF. Мне просто интересно, могу ли я улучшить производительность своей функции, чтобы получить результаты за меньшее время, потому что это хорошо для небольших фреймов данных, но не для больших.

Заранее спасибо.

Вот код, вы сможете запустить его, как это представлено:

# Importing sql types 
from pyspark.sql.types import StringType, IntegerType, StructType, StructField 
from pyspark.sql.functions import udf, col 
import unicodedata 

# Building a simple dataframe: 
schema = StructType([StructField("city", StringType(), True), 
        StructField("country", StringType(), True), 
        StructField("population", IntegerType(), True)]) 

countries = ['Venezuela', '[email protected]', 'Brazil', 'Spain'] 
cities = ['Maracaibó', 'New York', ' São Paulo ', '~Madrid'] 
population = [37800000,19795791,12341418,6489162] 

# Dataframe: 
df = sqlContext.createDataFrame(list(zip(cities, countries, population)), schema=schema) 

df.show() 

class Test(): 
    def __init__(self, df): 
     self.df = df 

    def clearAccents(self, columns): 
     """This function deletes accents in strings column dataFrames, 
     it does not eliminate main characters, but only deletes special tildes. 

     :param columns String or a list of column names. 
     """ 
     # Filters all string columns in dataFrame 
     validCols = [c for (c, t) in filter(lambda t: t[1] == 'string', self.df.dtypes)] 

     # If None or [] is provided with column parameter: 
     if (columns == "*"): columns = validCols[:] 

     # Receives a string as an argument 
     def remove_accents(inputStr): 
      # first, normalize strings: 
      nfkdStr = unicodedata.normalize('NFKD', inputStr) 
      # Keep chars that has no other char combined (i.e. accents chars) 
      withOutAccents = u"".join([c for c in nfkdStr if not unicodedata.combining(c)]) 
      return withOutAccents 

     function = udf(lambda x: remove_accents(x) if x != None else x, StringType()) 
     exprs = [function(col(c)).alias(c) if (c in columns) and (c in validCols) else c for c in self.df.columns] 
     self.df = self.df.select(*exprs) 

foo = Test(df) 
foo.clearAccents(columns="*") 
foo.df.show() 
+0

После работы в этой функции, остальная часть кода может можно найти в [библиотеке преобразований] (https://github.com/mood-agency/optimus) –

ответ

4

Одним из возможных улучшений является создание пользовательского Transformer, который будет обрабатывать нормализацию Unicode и соответствующую оболочку Python. Это должно сократить общие накладные расходы на передачу данных между JVM и Python и не требует каких-либо изменений в самой Spark или для доступа к частному API.

На JVM стороне вам нужен трансформатор, подобный этому:

package net.zero323.spark.ml.feature 

import java.text.Normalizer 
import org.apache.spark.ml.UnaryTransformer 
import org.apache.spark.ml.param._ 
import org.apache.spark.ml.util._ 
import org.apache.spark.sql.types.{DataType, StringType} 

class UnicodeNormalizer (override val uid: String) 
    extends UnaryTransformer[String, String, UnicodeNormalizer] { 

    def this() = this(Identifiable.randomUID("unicode_normalizer")) 

    private val forms = Map(
    "NFC" -> Normalizer.Form.NFC, "NFD" -> Normalizer.Form.NFD, 
    "NFKC" -> Normalizer.Form.NFKC, "NFKD" -> Normalizer.Form.NFKD 
) 

    val form: Param[String] = new Param(this, "form", "unicode form (one of NFC, NFD, NFKC, NFKD)", 
    ParamValidators.inArray(forms.keys.toArray)) 

    def setN(value: String): this.type = set(form, value) 

    def getForm: String = $(form) 

    setDefault(form -> "NFKD") 

    override protected def createTransformFunc: String => String = { 
    val normalizerForm = forms($(form)) 
    (s: String) => Normalizer.normalize(s, normalizerForm) 
    } 

    override protected def validateInputType(inputType: DataType): Unit = { 
    require(inputType == StringType, s"Input type must be string type but got $inputType.") 
    } 

    override protected def outputDataType: DataType = StringType 
} 

Соответствующее определение сборки:

name := "unicode-normalization" 

version := "1.0" 

crossScalaVersions := Seq("2.10.6", "2.11.8") 

organization := "net.zero323" 

val sparkVersion = "1.6.2" 

libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" % sparkVersion, 
    "org.apache.spark" %% "spark-sql" % sparkVersion, 
    "org.apache.spark" %% "spark-mllib" % sparkVersion 
) 

На стороне Python вам нужно обертку, подобное этому. Если вы используете 2.0+ keyword_only, он был перемещен в начало pyspark.

from pyspark.ml.param.shared import * 
from pyspark.ml.util import keyword_only 
from pyspark.ml.wrapper import JavaTransformer 

class UnicodeNormalizer(JavaTransformer, HasInputCol, HasOutputCol): 

    @keyword_only 
    def __init__(self, form="NFKD", inputCol=None, outputCol=None): 
     super(UnicodeNormalizer, self).__init__() 
     self._java_obj = self._new_java_obj(
      "net.zero323.spark.ml.feature.UnicodeNormalizer", self.uid) 
     self.form = Param(self, "form", 
      "unicode form (one of NFC, NFD, NFKC, NFKD)") 
     kwargs = self.__init__._input_kwargs 
     self.setParams(**kwargs) 

    @keyword_only 
    def setParams(self, form="NFKD", inputCol=None, outputCol=None): 
     kwargs = self.setParams._input_kwargs 
     return self._set(**kwargs) 

    def setForm(self, value): 
     return self._set(form=value) 

    def getForm(self): 
     return self.getOrDefault(self.form) 

Сложение Scala пакет:

sbt +package 

включить его при запуске оболочки или отправить. Например, для сборки Spark с помощью Scala 2.10:

bin/pyspark --jars path-to/target/scala-2.10/unicode-normalization_2.10-1.0.jar \ 
--driver-class-path path-to/target/scala-2.10/unicode-normalization_2.10-1.0.jar 

и вы должны быть готовы к работе. Все, что осталось немного регулярных выражений магии:

from pyspark.sql.functions import regexp_replace 

normalizer = UnicodeNormalizer(form="NFKD", 
    inputCol="text", outputCol="text_normalized") 

df = sc.parallelize([ 
    (1, "Maracaibó"), (2, "New York"), 
    (3, " São Paulo "), (4, "~Madrid") 
]).toDF(["id", "text"]) 

(normalizer 
    .transform(df) 
    .select(regexp_replace("text_normalized", "\p{M}", "")) 
    .show()) 

## +--------------------------------------+ 
## |regexp_replace(text_normalized,\p{M},)| 
## +--------------------------------------+ 
## |        Maracaibo| 
## |        New York| 
## |       Sao Paulo | 
## |        ~Madrid| 
## +--------------------------------------+ 

Пожалуйста, обратите внимание, что это такие же правила, как встроенные в текстовых трансформаторах и не является нулевым безопасным.Вы можете легко исправить это, проверив null в createTransformFunc.

1

Это решение только Python, но это только полезно, если число возможных акцентов низкая (например, один единый язык как испанский), а замены символов указаны вручную.

Кажется, что нет встроенного способа делать то, что вы просили напрямую без UDF, однако вы можете связать много вызовов regexp_replace, чтобы заменить каждый возможный акцентный символ. Я тестировал производительность этого решения, и выясняется, что он работает только быстрее, если у вас есть очень ограниченный набор акцентов для замены. В этом случае он может быть быстрее, чем UDF, потому что он оптимизирован за пределами Python.

from pyspark.sql.functions import col, regexp_replace 

accent_replacements_spanish = [ 
    (u'á', 'a'), (u'Á', 'A'), 
    (u'é', 'e'), (u'É', 'E'), 
    (u'í', 'i'), (u'Í', 'I'), 
    (u'ò', 'o'), (u'Ó', 'O'), 
    (u'ú|ü', 'u'), (u'Ú|Ű', 'U'), 
    (u'ñ', 'n') 
    # see http://stackoverflow.com/a/18123985/3810493 for other characters 

    # this will convert other non ASCII characters to a question mark: 
    ('[^\x00-\x7F]', '?') 
] 

def remove_accents(column): 
    r = col(column) 
    for a, b in accent_replacements_spanish: 
     r = regexp_replace(r, a, b) 
    return r.alias('remove_accents(' + column + ')') 

df = sqlContext.createDataFrame([['Olà'], ['Olé']], ['str']) 
df.select(remove_accents('str')).show() 

Я не сравнивал производительность с другими ответами, и эта функция не так вообще, но по крайней мере, стоит задуматься, потому что вам не нужно, чтобы добавить Scala или Java в процессе сборки.

+0

Я удалил свои комментарии, но вы могли бы добавить примечание о том, что оно не эквивалентно коду в que Stion? – zero323

+0

Я обновил свой ответ. Теперь должно быть ясно, что это не совсем эквивалентно и что оно имеет ограниченное применение. Спасибо за ответ. –

4

Другой способ для выполнения с помощью питона Unicode Database:

import unicodedata 
import sys 

from pyspark.sql.functions import translate, regexp_replace 

def make_trans(): 
    matching_string = "" 
    replace_string = "" 

    for i in range(ord(" "), sys.maxunicode): 
     name = unicodedata.name(chr(i), "") 
     if "WITH" in name: 
      try: 
       base = unicodedata.lookup(name.split(" WITH")[0]) 
       matching_string += chr(i) 
       replace_string += base 
      except KeyError: 
       pass 

    return matching_string, replace_string 

def clean_text(c): 
    matching_string, replace_string = make_trans() 
    return translate(
     regexp_replace(c, "\p{M}", ""), 
     matching_string, replace_string 
    ).alias(c) 

Итак, теперь давайте проверим это:

df = sc.parallelize([ 
(1, "Maracaibó"), (2, "New York"), 
(3, " São Paulo "), (4, "~Madrid"), 
(5, "São Paulo"), (6, "Maracaibó") 
]).toDF(["id", "text"]) 

df.select(clean_text("text")).show() 
## +---------------+ 
## |   text| 
## +---------------+ 
## |  Maracaibo| 
## |  New York| 
## | Sao Paulo | 
## |  ~Madrid| 
## |  Sao Paulo| 
## |  Maracaibo| 
## +---------------+ 

признают @ zero323