2016-03-08 3 views
3

Я запускаю искру 1.6 на 3 виртуальных машинах (то есть 1x master, 2x slaves), все с 4 ядрами и 16 ГБ оперативной памяти.Более одного часа для выполнения pyspark.sql.DataFrame.take (4)

Я вижу рабочих, зарегистрированных на веб-сайте искрового мастера.

Я хочу получить данные из моей базы данных Vertica, чтобы работать над ней. Поскольку мне не удавалось запускать сложные запросы, я попробовал фиктивные запросы, чтобы понять. Мы считаем здесь легкую задачу.

Мой код:

df = sqlContext.read.format('jdbc').options(url='xxxx', dbtable='xxx', user='xxxx', password='xxxx').load() 
four = df.take(4) 

И выход (примечание: я заменить @IPSLAVE ведомого VM IP: Port):

16/03/08 13:50:41 INFO SparkContext: Starting job: take at <stdin>:1 
16/03/08 13:50:41 INFO DAGScheduler: Got job 0 (take at <stdin>:1) with 1 output partitions 
16/03/08 13:50:41 INFO DAGScheduler: Final stage: ResultStage 0 (take at <stdin>:1) 
16/03/08 13:50:41 INFO DAGScheduler: Parents of final stage: List() 
16/03/08 13:50:41 INFO DAGScheduler: Missing parents: List() 
16/03/08 13:50:41 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at take at <stdin>:1), which has no missing parents 
16/03/08 13:50:41 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 5.4 KB, free 5.4 KB) 
16/03/08 13:50:41 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 2.6 KB, free 7.9 KB) 
16/03/08 13:50:41 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on @IPSLAVE (size: 2.6 KB, free: 511.5 MB) 
16/03/08 13:50:41 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006 
16/03/08 13:50:41 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at take at <stdin>:1) 
16/03/08 13:50:41 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 
16/03/08 13:50:41 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, @IPSLAVE, partition 0,PROCESS_LOCAL, 1922 bytes) 
16/03/08 13:50:41 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on @IPSLAVE (size: 2.6 KB, free: 511.5 MB) 
16/03/08 15:02:20 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 4299240 ms on @IPSLAVE (1/1) 
16/03/08 15:02:20 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
16/03/08 15:02:20 INFO DAGScheduler: ResultStage 0 (take at <stdin>:1) finished in 4299.248 s 
16/03/08 15:02:20 INFO DAGScheduler: Job 0 finished: take at <stdin>:1, took 4299.460581 s 

Как вы можете видеть, что это займет reaaaaaally долго время. Моя таблица на самом деле довольно большая (хранит около 220 миллионов строк, по 11 полей), но такой запрос будет выполнен мгновенно, используя «обычный» sql (например, pyodbc).

Я думаю, что я пропущен/пропущен Spark, у вас были бы идеи или советы, чтобы они работали лучше?

ответ

6

В то время как Spark поддерживает ограниченное предикатное нажатие на JDBC, все другие операции, такие как limit, group, aggregations выполняются внутренне. К сожалению, это означает, что take(4) сначала извлечет данные, а затем применит limit. Другими словами, ваша база данных будет Execute (не предполагающая никаких прогнозов AN фильтры) что-то эквивалентное:

SELECT * FROM table 

, а остальные будут обрабатываться с помощью искры. Есть несколько оптимизаций (в частности, Spark оценивает разделы итеративно, чтобы получить количество записей, запрошенных LIMIT), но все еще довольно неэффективный процесс по сравнению с оптимизацией на стороне базы данных.

Если вы хотите, чтобы нажать limit в базу данных, вам придется это сделать статический с помощью подзапроса в качестве параметра dbtable:

(sqlContext.read.format('jdbc') 
    .options(url='xxxx', dbtable='(SELECT * FROM xxx LIMIT 4) tmp', ....)) 
sqlContext.read.format("jdbc").options(Map(
    "url"  -> "xxxx", 
    "dbtable" -> "(SELECT * FROM xxx LIMIT 4) tmp", 
)) 

Пожалуйста, обратите внимание, что псевдоним в подзапросе является обязательным.

Примечание:

Такое поведение может быть улучшено в будущем, когда Source Data API v2 готов:

+0

Интересно. Я поработаю над этим и вернусь к вам, чтобы проверить anwser, если все в порядке :) Thx! – pltrdy

+0

@ zero323 Должен ли символ после 'tmp' быть' '' вместо этого? Пример Scala работает с 'sqlContext.read.jdbc (" jdbc: sqlserver: //example.com; databaseName = local; user = debug; password = debug "," (SELECT TOP 5 * FROM ExampleTable) tmp ", новый java .util.Properties) ' –

+0

@ RăzvanPanda Да, это должно быть. Исправлено, спасибо Опции работают в Scala. – zero323