Мы пытаемся интегрировать кластер ES (1.7.2, 4 узла) с Spark (1.5.1, скомпилированный с улей и хаопом с scala 2.11, 4 узла кластера), есть hdfs, входящие в уравнение (hadoop 2.7,4 node) и бережливый сервер jdbc и elasticsearch-hadoop-2.2.0-m1.jarElasticsearch-hadoop & Elasticsearch-spark sql - Отслеживание операторов сканирования и прокрутки
Таким образом, существует два способа выполнения инструкции для ES.
Спарк SQL с Скале
val conf = new SparkConf().setAppName("QueryRemoteES").setMaster("spark://node1:37077").set("spark.executor.memory","2g") conf.set("spark.logConf", "true") conf.set("spark.cores.max","20") conf.set("es.index.auto.create", "false") conf.set("es.batch.size.bytes", "100mb") conf.set("es.batch.size.entries", "10000") conf.set("es.scroll.size", "10000") conf.set("es.nodes", "node2:39200") conf.set("es.nodes.discovery","true") conf.set("pushdown", "true") sc.addJar("executorLib/elasticsearch-hadoop-2.2.0-m1.jar") sc.addJar("executorLib/scala-library-2.10.1.jar") sqlContext.sql("CREATE TEMPORARY TABLE geoTab USING org.elasticsearch.spark.sql OPTIONS (resource 'geo_2/kafkain')") val all: DataFrame = sqlContext.sql("SELECT count(*) FROM geoTab WHERE transmittersID='262021306841042'") .....
сервера Бережливость (код выполняется на искру)
.... polledDataSource = new ComboPooledDataSource() polledDataSource.setDriverClass("org.apache.hive.jdbc.HiveDriver") polledDataSource.setJdbcUrl("jdbc:hive2://node1:30001") polledDataSource.setMaxPoolSize(5) dbConnection = polledDataSource.getConnection dbStatement = dbConnection.createStatement val dbResult = dbStatement.execute("CREATE TEMPORARY EXTERNAL TABLE IF NOT EXISTS geoDataHive6(transmittersID STRING,lat DOUBLE,lon DOUBLE) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES('es.resource' = 'geo_2/kafkain','es.query'='{\"query\":{\"term\":{\"transmittersID\":\"262021306841042\"}}}','es.nodes'='node2','es.port'='39200','es.nodes.discovery' = 'false','es.mapping.include' = 'trans*,point.*','es.mapping.names' = 'transmittersID:transmittersID,lat:point.lat,lon:point.lon','pushdown' = 'true')") dbStatement.setFetchSize(50000) dbResultSet = dbStatement.executeQuery("SELECT count(*) FROM geoDataHive6") .....
я следующие вопросы и в связи с тем, что они связаны, Я решил упаковать их в один вопрос в стек:
Похоже, что метод, использующий Spark SQL, поддерживает pushdown того, что идет за WHERE (независимо от того, указан ли es.query или нет), время выполнения является одинаковым и приемлемым. Но номер решения 1 определенно не поддерживает pushgow агрегирующих функций, то есть представленный счетчик (*) не выполняется на стороне ES, но только после того, как все данные получены - ES возвращает строки, а Spark SQL подсчитывает их. Пожалуйста, подтвердите, если это правильное поведение
- номер
Решение один ведет себя странно в том, передается ли магазинный истинным или ложным, время равно
Решение номер 2, кажется, не поддерживает не раскрывающееся вниз, это не имеет значения, в каком Я пытаюсь указать суб-запрос, будь то часть определения таблицы или предложение WHERE, похоже, что он просто извлекает весь огромный индекс, а затем делает математику на нем. Является ли это так, что бережливость улья не в состоянии сделать раскрывающееся вниз на ES
Я хотел бы, чтобы отслеживать запросы в упругом поиска, я установить следующее:
//logging.yml index.search.slowlog: TRACE, index_search_slow_log_file index.indexing.slowlog: TRACE, index_indexing_slow_log_file additivity: index.search.slowlog: true index.indexing.slowlog: true
Все index.search .slowlog.threshold.query, index.search.slowlog.threshold.fetch и даже index.indexing.slowlog.threshold.index установлены в 0ms. И я вижу в файлах slowlog общие утверждения, выполненные из смысла (так оно и работает). Но я не вижу операторов Spark SQL или бережливость, выполняемых против ES. Я предполагаю, что это сканирование & прокрутка, потому что если я выполняю сканирование &, прокрутите список от смысла, они также не будут зарегистрированы. Возможно ли как-то отслеживать сканирование & прокручивать по бокам ES?
Я не смог найти что-нибудь о 'elasticsearch-spark'. Я думал, что «elasticsearch-hadoop» включает поддержку Spark? – Sid
Я не понимаю ваш вопрос. – eliasah
Есть ли загружаемый проект под названием «elasticsearch-spark», например «elasticsearch-hadoop»? Я не нашел ничего на github. – Sid