0

У меня есть случай много, как это:Установка массивов в Elasticsearch через PySpark

Пример DataFrame:

from pyspark.sql.types import * 
schema = StructType([ # schema 
    StructField("id", StringType(), True), 
    StructField("email", ArrayType(StringType()), True)]) 
df = spark.createDataFrame([{"id": "id1"}, 
          {"id": "id2", "email": None}, 
          {"id": "id3","email": ["[email protected]"]}, 
          {"id": "id4", "email": ["[email protected]", "[email protected]"]}], 
          schema=schema) 
df.show(truncate=False) 
+---+------------------------------------+ 
|id |email        | 
+---+------------------------------------+ 
|id1|null        | 
|id2|null        | 
|id3|[[email protected]]     | 
|id4|[[email protected], [email protected]]| 
+---+------------------------------------+ 

Я хочу, чтобы вставить эти данные в Elasticsearch, так, насколько я исследовал, я для преобразования в формат индексации:

def parseTest(r): 
    if r['email'] is None: 
     return r['id'],{"id":r['id']} 
    else: 
     return r['id'],{"id":r['id'],"email":r['email']} 
df2 = df.rdd.map(lambda row: parseTest(row)) 
df2.top(4) 
[('id4', {'email': ['[email protected]', '[email protected]'], 'id': 'id4'}), 
('id3', {'email': ['[email protected]'], 'id': 'id3'}), 
('id2', {'id': 'id2'}), 
('id1', {'id': 'id1'})] 

Тогда я пытаюсь вставить:

es_conf = {"es.nodes" : "node1.com,node2.com", 
      "es.resource": "index/type"} 
df2.saveAsNewAPIHadoopFile(
    path='-', 
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", 
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_conf) 

И я получаю это:

org.apache.spark.SparkException: Данные типа java.util.ArrayList не может быть использован

Spark v 2.1.0 
ES v 2.4.4 

Без email поля он работает Я нашел некоторое предложенное решение, используя es.output.json: true и json.dumps, но, похоже, он был для версии 5, поэтому я попробовал в другом кластере, который у меня есть с ES v5

df3 = df2.map(json.dumps) 
df3.top(4) 
['["id4", {"email": ["[email protected]", "[email protected]"], "id": "id4"}]', 
'["id3", {"email": ["[email protected]"], "id": "id3"}]', 
'["id2", {"id": "id2"}]', 
'["id1", {"id": "id1"}]'] 
es_conf2 = {"es.nodes" : "anothernode1.com,anothernode2.com", 
      "es.output.json": "true", 
      "es.resource": "index/type"} 
df3.saveAsNewAPIHadoopFile(
    path='-', 
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", 
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_conf2) 

Тогда я получаю:

не может быть использован

РДД элемент типа java.lang.String

Spark v 2.1.0 
ES v 5.2.0 

feelsbadman

ответ

0

я нашел еще один способ сделать ту же работу , используя метод write объекта dataframe.

Итак, после первой секции:

from pyspark.sql.types import * 
schema = StructType([ # schema 
    StructField("id", StringType(), True), 
    StructField("email", ArrayType(StringType()), True)]) 
df = spark.createDataFrame([{"id": "id1"}, 
          {"id": "id2", "email": None}, 
          {"id": "id3","email": ["[email protected]"]}, 
          {"id": "id4", "email": ["[email protected]", "[email protected]"]}], 
          schema=schema) 
df.show(truncate=False) 
+---+------------------------------------+ 
|id |email        | 
+---+------------------------------------+ 
|id1|null        | 
|id2|null        | 
|id3|[[email protected]]     | 
|id4|[[email protected], [email protected]]| 
+---+------------------------------------+ 

Вам просто нужно:

df.write\ 
    .format("org.elasticsearch.spark.sql")\ 
    .option("es.nodes","node1.com,node2.com")\ 
    .option("es.resource","index/type")\ 
    .option("es.mapping.id", "id")\ 
    .save() 

Нет необходимости превращаться в РДУ или изменять каким-либо образом.