3

Я разработал простую задачу для чтения данных из MySQL и сохранения ее в Elasticsearch с помощью Spark.Сохранить Spark Dataframe в Elasticsearch - Невозможно обработать исключение типа

Вот код:

JavaSparkContext sc = new JavaSparkContext(
     new SparkConf().setAppName("MySQLtoEs") 
       .set("es.index.auto.create", "true") 
       .set("es.nodes", "127.0.0.1:9200") 
       .set("es.mapping.id", "id") 
       .set("spark.serializer", KryoSerializer.class.getName())); 

SQLContext sqlContext = new SQLContext(sc); 

// Data source options 
Map<String, String> options = new HashMap<>(); 
options.put("driver", MYSQL_DRIVER); 
options.put("url", MYSQL_CONNECTION_URL); 
options.put("dbtable", "OFFERS"); 
options.put("partitionColumn", "id"); 
options.put("lowerBound", "10001"); 
options.put("upperBound", "499999"); 
options.put("numPartitions", "10"); 

// Load MySQL query result as DataFrame 
LOGGER.info("Loading DataFrame"); 
DataFrame jdbcDF = sqlContext.load("jdbc", options); 
DataFrame df = jdbcDF.select("id", "title", "description", 
     "merchantId", "price", "keywords", "brandId", "categoryId"); 
df.show(); 
LOGGER.info("df.count : " + df.count()); 
EsSparkSQL.saveToEs(df, "offers/product"); 

Вы можете увидеть код очень прост. Он считывает данные в DataFrame, выбирает некоторые столбцы и затем выполняет count как основное действие в Dataframe. До сих пор все работает нормально.

Затем он пытается сохранить данные в Elasticsearch, но он терпит неудачу, потому что не может обрабатывать какой-либо тип. Вы можете увидеть журнал ошибок here.

Я не уверен, почему он не может справиться с этим типом. Кто-нибудь знает, почему это происходит?

Я использую Apache Спарк 1.5.0, 1.4.4 и Elasticsearch elaticsearch-Hadoop 2.1.1

EDIT:

  • Я обновил ссылку GIST с образцом набора данных наряду с исходным кодом.
  • Я также попытался использовать elasticsearch-hadoop dev builds как упоминается @costin в списке рассылки.

ответ

8

Ответ на этот вопрос был сложным, но благодаря samklr, мне удалось понять, в чем проблема.

Решение не является простым, и может рассматриваться как «ненужное» преобразование.

Сначала давайте поговорим о Сериализация.

Существует два аспекта сериализации, которые следует учитывать при сериализации данных Spark и сериализации функций Spark. В этом случае речь идет о сериализации данных и, следовательно, де-сериализации.

С точки зрения Spark, требуется только сериализация - Spark полагается по умолчанию на сериализацию Java, что удобно, но довольно неэффективно. Именно по этой причине сам Hadoop представил свой собственный механизм сериализации и свои собственные типы, а именно Writables. Таким образом, InputFormat и OutputFormats должны вернуть Writables, который из коробки Spark не понимает.

С соединителем искробезопасности искры необходимо включить другую сериализацию (Kryo), которая автоматически обрабатывает преобразование, а также делает это достаточно эффективно.

conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer") 

Даже так Kryo не требует, чтобы класс реализовать конкретный интерфейс для сериализации, что означает POJOs может быть использован в РДУ без какой-либо дополнительной работы после включения Kryo сериализации.

Сказанное, @samklr указал мне, что Крио нужно зарегистрировать классы, прежде чем использовать их.

Это потому, что Kryo записывает ссылку на класс объекта, который сериализуется (одна ссылка записывается для каждого записанного объекта), который является всего лишь идентификатором целого числа, если класс был зарегистрирован, но в противном случае является полным именем класса. Spark регистрирует классы Scala и многие другие классы инфраструктуры (например, классы Avro Generic или Thrift) от вашего имени.

Регистрация классов с помощью Kryo проста. Создайте подкласс KryoRegistrator, и переопределить метод registerClasses():

public class MyKryoRegistrator implements KryoRegistrator, Serializable { 
    @Override 
    public void registerClasses(Kryo kryo) { 
     // Product POJO associated to a product Row from the DataFrame    
     kryo.register(Product.class); 
    } 
} 

Наконец, в программе драйвера, установите свойство spark.kryo.registrator в полностью квалифицированного имени класса вашей реализации KryoRegistrator:

conf.set("spark.kryo.registrator", "MyKryoRegistrator") 

Во-вторых, даже подумал, что сериализатор Kryo установлен, и класс зарегистрирован, с изменениями, внесенными в Spark 1.5, и почему-то Elasticsearch не мог де-сериализовать Dataframe, потому что он не может вывести SchemaType Dataframe в Конек тор.

Так что я должен был преобразовать Dataframe к JavaRDD

JavaRDD<Product> products = df.javaRDD().map(new Function<Row, Product>() { 
    public Product call(Row row) throws Exception { 
     long id = row.getLong(0); 
     String title = row.getString(1); 
     String description = row.getString(2); 
     int merchantId = row.getInt(3); 
     double price = row.getDecimal(4).doubleValue(); 
     String keywords = row.getString(5); 
     long brandId = row.getLong(6); 
     int categoryId = row.getInt(7); 
     return new Product(id, title, description, merchantId, price, keywords, brandId, categoryId); 
    } 
}); 

Теперь данные готовы записать в elasticsearch:

JavaEsSpark.saveToEs(products, "test/test"); 

Ссылки:

  • Elasticsearch-х Поддержка Apache Spark documentation.
  • Hadoop Definitive Guide, Chapter 19. Spark, ed. 4 - Том Уайт.
  • Пользователь samklr.

 Смежные вопросы

  • Нет связанных вопросов^_^