2017-02-10 15 views
0

Разрешена ли обрезка разделов для кэшированных TempTables в искры apache? Если да, то как его настроить?Обрезка разделов Spark SQL для кэшированной таблицы

Мои данные - это куча показаний датчиков в разных установках, одна строка содержит имя установки, тег, метку времени и значение.

Я написал данные в формате паркетной с помощью следующих команд:

rdd.toDF("installationName", "tag", "timestamp", "value") 
    .repartition($"installationName", $"tag") 
    .write.partitionBy("installationName","tag").mode("overwrite").parquet(config.output) 

Я читал эти данные, используя следующую команду в таблицу SQL с помощью искровой HiveContext:

val parquet = hc.read.parquet("/path_to_table/tablename") 
parquet.registerTempTable("tablename") 

Теперь, если я запустите SQL-запрос в этой таблице, это приведет к обрезке разделов, как ожидалось:

hc.sql("select * from tablename where installationName = 'XXX' and tag = 'YYY'") 

И запрос занимает около 8 секунд. Но если я кэшировать в памяти таблицу, а затем выполнить тот же запрос, он всегда занимает около 50 секунд:

hc.sql("CACHE TABLE tablename") 
hc.sql("select * from tablename where installationName = 'XXX' and tag = 'YYY'") 

Я в настоящее время используется Спарк 1.6.1.

+0

Привет, спасибо за ваш комментарий. Действительно, я делаю операцию перераспределения, прежде чем записывать данные в паркет. Я также тестировал вышеупомянутый запрос с перераспределением, и он более эффективен с временем запроса 20 с, но все же он медленнее, чем чтение из файлов паркета без кеширования. Моя цель - вообще не писать в паркетные файлы. Не могли бы вы предоставить какой-то источник - откуда вы знаете, что обрезка разделов не поддерживается после кеширования? Если вы напишете ответ здесь, я могу принять его. –

+0

Коррекция, кэширование в памяти сокращает время запроса до менее 1 секунды, что, конечно, уже приемлемо. Интересно, масштабируется ли это: это только часть моей дасты, у меня на самом деле более 200 раз больше и постоянно растет, поэтому чем больше данных у меня есть, тем больше времени просматривается во всех разделах, поэтому разделение разделов может показаться полезным здесь , –

ответ

0

Причина, по которой это происходит, объясняется тем, как работает кеш в искры.

При вызове какой-то процесс в DataFrame, РДУ или DataSet исполнения имеет план смотри ниже:

val df = sc.parallelize(1 to 10000).toDF("line") 
df.withColumn("new_line", col("line") * 10).queryExecution 

Команда queryExecution возвращение к вам план. См. Логический план ниже кода:

== Parsed Logical Plan == 
Project [*,('line * 10) AS new_line#7] 
+- Project [_1#4 AS line#5] 
    +- LogicalRDD [_1#4], MapPartitionsRDD[9] at 

== Analyzed Logical Plan == 
line: int, new_line: int 
Project [line#5,(line#5 * 10) AS new_line#7] 
+- Project [_1#4 AS line#5] 
    +- LogicalRDD [_1#4], MapPartitionsRDD[9] at 

== Optimized Logical Plan == 
Project [_1#4 AS line#5,(_1#4 * 10) AS new_line#7] 
+- LogicalRDD [_1#4], MapPartitionsRDD[9] at intRddToDataFrameHolder at 

== Physical Plan == 
Project [_1#4 AS line#5,(_1#4 * 10) AS new_line#7] 
+- Scan ExistingRDD[_1#4] 

В этом случае вы можете увидеть весь процесс, который будет выполнять ваш код. При вызове cache функцию:

df.withColumn("new_line", col("line") * 10).cache().queryExecution 

Результат будет выглядеть следующим образом:

== Parsed Logical Plan == 
'Project [*,('line * 10) AS new_line#8] 
+- Project [_1#4 AS line#5] 
    +- LogicalRDD [_1#4], MapPartitionsRDD[9] at intRddToDataFrameHolder at <console>:34 

== Analyzed Logical Plan == 
line: int, new_line: int 
Project [line#5,(line#5 * 10) AS new_line#8] 
+- Project [_1#4 AS line#5] 
    +- LogicalRDD [_1#4], MapPartitionsRDD[9] at intRddToDataFrameHolder at <console>:34 

== Optimized Logical Plan == 
InMemoryRelation [line#5,new_line#8], true, 10000, StorageLevel(true, true, false, true, 1), Project [_1#4 AS line#5,(_1#4 * 10) AS new_line#8], None 

== Physical Plan == 
InMemoryColumnarTableScan [line#5,new_line#8], InMemoryRelation [line#5,new_line#8], true, 10000, StorageLevel(true, true, false, true, 1), Pro... 

Это исполнение вернет вам исполнение с InMemoryRelation в optmized логического плана, это будет сэкономить структура данных в вашей памяти, или если ваши данные действительно большие, они будут разливаться на диск.

Время, необходимое для сохранения этого в вашем кластере, требует времени, оно будет немного медленным в первом исполнении, но когда вам нужно снова получить те же данные в другом месте, DF или RDD будут сохранены, а Spark больше не будет запрашивать выполнение.

+0

Спасибо за ваш ответ! В искробезопасном кэшировании выполняется операция, которая означает, что данные кэшируются уже при первом запуске моего запроса. Кэширование данных на самом деле занимает 500 секунд, и, действительно, после кеша, производительность запроса улучшена, теперь требуется всего 50 секунд для сканирования всех разделов. Независимо от того, сколько раз я запускаю запрос, производительность всегда примерно такая же. Ваш ответ не затрагивает мой вопрос, который касается обрезки разделов после cahcing. –