Я разработал простую задачу для чтения данных из MySQL и сохранения ее в Elasticsearch с помощью Spark.Сохранить Spark Dataframe в Elasticsearch - Невозможно обработать исключение типа
Вот код:
JavaSparkContext sc = new JavaSparkContext(
new SparkConf().setAppName("MySQLtoEs")
.set("es.index.auto.create", "true")
.set("es.nodes", "127.0.0.1:9200")
.set("es.mapping.id", "id")
.set("spark.serializer", KryoSerializer.class.getName()));
SQLContext sqlContext = new SQLContext(sc);
// Data source options
Map<String, String> options = new HashMap<>();
options.put("driver", MYSQL_DRIVER);
options.put("url", MYSQL_CONNECTION_URL);
options.put("dbtable", "OFFERS");
options.put("partitionColumn", "id");
options.put("lowerBound", "10001");
options.put("upperBound", "499999");
options.put("numPartitions", "10");
// Load MySQL query result as DataFrame
LOGGER.info("Loading DataFrame");
DataFrame jdbcDF = sqlContext.load("jdbc", options);
DataFrame df = jdbcDF.select("id", "title", "description",
"merchantId", "price", "keywords", "brandId", "categoryId");
df.show();
LOGGER.info("df.count : " + df.count());
EsSparkSQL.saveToEs(df, "offers/product");
Вы можете увидеть код очень прост. Он считывает данные в DataFrame, выбирает некоторые столбцы и затем выполняет count
как основное действие в Dataframe. До сих пор все работает нормально.
Затем он пытается сохранить данные в Elasticsearch, но он терпит неудачу, потому что не может обрабатывать какой-либо тип. Вы можете увидеть журнал ошибок here.
Я не уверен, почему он не может справиться с этим типом. Кто-нибудь знает, почему это происходит?
Я использую Apache Спарк 1.5.0, 1.4.4 и Elasticsearch elaticsearch-Hadoop 2.1.1
EDIT:
- Я обновил ссылку GIST с образцом набора данных наряду с исходным кодом.
- Я также попытался использовать elasticsearch-hadoop dev builds как упоминается @costin в списке рассылки.