4

Мы пытаемся интегрировать кластер ES (1.7.2, 4 узла) с Spark (1.5.1, скомпилированный с улей и хаопом с scala 2.11, 4 узла кластера), есть hdfs, входящие в уравнение (hadoop 2.7,4 node) и бережливый сервер jdbc и elasticsearch-hadoop-2.2.0-m1.jarElasticsearch-hadoop & Elasticsearch-spark sql - Отслеживание операторов сканирования и прокрутки

Таким образом, существует два способа выполнения инструкции для ES.

  1. Спарк SQL с Скале

    val conf = new SparkConf().setAppName("QueryRemoteES").setMaster("spark://node1:37077").set("spark.executor.memory","2g") 
    conf.set("spark.logConf", "true") 
    conf.set("spark.cores.max","20") 
    conf.set("es.index.auto.create", "false") 
    conf.set("es.batch.size.bytes", "100mb") 
    conf.set("es.batch.size.entries", "10000") 
    conf.set("es.scroll.size", "10000") 
    conf.set("es.nodes", "node2:39200") 
    conf.set("es.nodes.discovery","true") 
    conf.set("pushdown", "true") 
    
    sc.addJar("executorLib/elasticsearch-hadoop-2.2.0-m1.jar") 
    sc.addJar("executorLib/scala-library-2.10.1.jar") 
    
    sqlContext.sql("CREATE TEMPORARY TABLE geoTab USING org.elasticsearch.spark.sql OPTIONS (resource 'geo_2/kafkain')") 
    
    val all: DataFrame = sqlContext.sql("SELECT count(*) FROM geoTab WHERE transmittersID='262021306841042'") 
    ..... 
    
  2. сервера Бережливость (код выполняется на искру)

    .... 
    polledDataSource = new ComboPooledDataSource() 
    polledDataSource.setDriverClass("org.apache.hive.jdbc.HiveDriver") 
    polledDataSource.setJdbcUrl("jdbc:hive2://node1:30001") 
    polledDataSource.setMaxPoolSize(5) 
    dbConnection = polledDataSource.getConnection 
    dbStatement = dbConnection.createStatement 
    
    val dbResult = dbStatement.execute("CREATE TEMPORARY EXTERNAL TABLE IF NOT EXISTS geoDataHive6(transmittersID STRING,lat DOUBLE,lon DOUBLE) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES('es.resource' = 'geo_2/kafkain','es.query'='{\"query\":{\"term\":{\"transmittersID\":\"262021306841042\"}}}','es.nodes'='node2','es.port'='39200','es.nodes.discovery' = 'false','es.mapping.include' = 'trans*,point.*','es.mapping.names' = 'transmittersID:transmittersID,lat:point.lat,lon:point.lon','pushdown' = 'true')") 
    
    dbStatement.setFetchSize(50000) 
    dbResultSet = dbStatement.executeQuery("SELECT count(*) FROM geoDataHive6") 
    ..... 
    

я следующие вопросы и в связи с тем, что они связаны, Я решил упаковать их в один вопрос в стек:

  1. Похоже, что метод, использующий Spark SQL, поддерживает pushdown того, что идет за WHERE (независимо от того, указан ли es.query или нет), время выполнения является одинаковым и приемлемым. Но номер решения 1 определенно не поддерживает pushgow агрегирующих функций, то есть представленный счетчик (*) не выполняется на стороне ES, но только после того, как все данные получены - ES возвращает строки, а Spark SQL подсчитывает их. Пожалуйста, подтвердите, если это правильное поведение

  2. номер

    Решение один ведет себя странно в том, передается ли магазинный истинным или ложным, время равно

  3. Решение номер 2, кажется, не поддерживает не раскрывающееся вниз, это не имеет значения, в каком Я пытаюсь указать суб-запрос, будь то часть определения таблицы или предложение WHERE, похоже, что он просто извлекает весь огромный индекс, а затем делает математику на нем. Является ли это так, что бережливость улья не в состоянии сделать раскрывающееся вниз на ES

  4. Я хотел бы, чтобы отслеживать запросы в упругом поиска, я установить следующее:

    //logging.yml 
    index.search.slowlog: TRACE, index_search_slow_log_file 
    index.indexing.slowlog: TRACE, index_indexing_slow_log_file 
    
    additivity: 
        index.search.slowlog: true 
        index.indexing.slowlog: true 
    

Все index.search .slowlog.threshold.query, index.search.slowlog.threshold.fetch и даже index.indexing.slowlog.threshold.index установлены в 0ms. И я вижу в файлах slowlog общие утверждения, выполненные из смысла (так оно и работает). Но я не вижу операторов Spark SQL или бережливость, выполняемых против ES. Я предполагаю, что это сканирование & прокрутка, потому что если я выполняю сканирование &, прокрутите список от смысла, они также не будут зарегистрированы. Возможно ли как-то отслеживать сканирование & прокручивать по бокам ES?

ответ

2
  1. Насколько я знаю, это ожидаемое поведение. Все источники, которые я знаю, ведут себя точно так же, и интуитивно это имеет смысл. SparkSQL предназначен для аналитических запросов и имеет смысл извлекать данные, кэшировать и обрабатывать локально. См. Также Does spark predicate pushdown work with JDBC?

  2. Я не думаю, что conf.set("pushdown", "true") не имеет никакого эффекта. Если вы хотите настроить конкретные настройки подключения, он должен быть передан как карта OPTION, как во втором случае.Использование префикса es должно также работать

  3. Это действительно странно. Martin Senne сообщил a similar issue с PostgreSQL, но я не смог воспроизвести это.

1

После обсуждения я имел с Costin Л на группе elasticsearch обсуждения, он отметил следующее и я должен поделиться им с вами:

Есть целый ряд вопросов с вашей установкой:

  1. Вы упомянули использование Scala 2.11, но используете Scala 2.10. Обратите внимание, что если вы хотите выбрать версию Scala, elasticsearch-spark следует использовать, elasticsearch-hadoop предоставляет двоичные файлы только для Scala 2.10.

  2. Функциональность pushdown доступна только через Spark DataSource. Если вы не используете этот тип декларации, pushdown не передается ES (так работает Spark). Следовательно, объявление pushdown не имеет значения.

  3. Обратите внимание, что, как все Титулы в ES-Hadoop начинаются с es. - исключение составляет лишь pushdown и location, которые являются Спарк DataSource специфического (следующие условными свеч, как это искра специфических особенностей в выделенном DS)

  4. Использование временной таблицы действительно считается источником данных, но вам нужно использовать pushdown. Если вы этого не сделаете, он активируется по умолчанию, поэтому вы не видите разницы между вашими прогонами; вы не изменили какой-либо соответствующий параметр.

  5. Счетчик и другие агрегаты не сбрасываются Spark. Возможно, в будущем может быть что-то, по словам команды Databricks, но в настоящее время ничего нет. Для подсчета вы можете сделать быстрый вызов, используя dataFrame.rdd.esCount. Но это исключительный случай.

  6. Я не уверен, действительно ли сервер Thrift считается источником данных, поскольку он загружает данные из Hive. Вы можете дважды проверить это, включив регистрацию в пакете org.elasticsearch.hadoop.spark в DEBUG. Вы должны увидеть, переводит ли SQL в DSL.

Надеюсь, это поможет!

+0

Я не смог найти что-нибудь о 'elasticsearch-spark'. Я думал, что «elasticsearch-hadoop» включает поддержку Spark? – Sid

+0

Я не понимаю ваш вопрос. – eliasah

+0

Есть ли загружаемый проект под названием «elasticsearch-spark», например «elasticsearch-hadoop»? Я не нашел ничего на github. – Sid