2017-02-20 53 views
1

У меня есть две точки зрения в моем ульеАльтернатива улья присоединиться

+------------+ 
| Table_1 | 
+------------+ 
| hash  | 
| campaignId | 
+------------+ 

+-----------------+ 
| Table_2   | 
+-----------------+ 
| campaignId  | 
| accountId  | 
| parentAccountID | 
+-----------------+ 

Теперь я должен получить данные «Table_1» фильтруется ACCOUNTID & parentAccountID, для которого я написал следующий запрос:

SELECT /*+ MAPJOIN(T2) */ T1.hash, COUNT(T1.campaignId) num_campaigns 
FROM Table_1 T1 
JOIN Table_2 T2 ON T1.campaignId = T2.campaignId 
WHERE (T2.accountId IN ('aid1', 'aid2') OR T2.parentAccountID IN ('aid1', 'aid2') 
GROUP BY T1.hash 

Этот запрос работает, но медленный. Есть ли какая-то лучшая альтернатива этому (JOIN)?

Я читаю Table_1 от kafka через искру.
слайдов Продолжительность 5 сек
Окно Продолжительность 2 минуты

В то время как Table_2 в RDBMS, который искры читает через JDBC, и это имеет 4500 записей.

Каждые 5 секунд насосы kafka приблизительно в 2K записываются в формате CSV.
Мне нужны данные для обработки в течение 5 секунд, но в настоящее время это занимает от 8 до 16 секунд.

В соответствии с предложений:

  1. Я отформатировал Table_1 столбцов CAMPAIGNID & хэша соответственно.
  2. Я переделал таблицу_2 по столбцам accountId & parentAccountID соответственно.
  3. Я реализовал MAPJOIN.

Но все же никакого улучшения.

ПРИМЕЧАНИЕ: Если я удалю длительность окна, процесс будет выполнен в течение времени. Может быть, из-за меньшего количества данных для обработки. Но это не требование.

+0

** (1) ** Как вы бы описали "slow"? ** (2) ** В каких объемах мы говорим? –

+0

Я использую это в искровом потоке, обрабатывая данные каждые 5 секунд. Но он занимает более 10 секунд для обработки каждой партии. (Примечание. Длительность слайда составляет 5 секунд, а продолжительность окна - 10 секунд). Каждая партия имеет около 24 тыс. Записей. –

+0

Вам нужен столбец «хэш»? –

ответ

0

С правильными индексами, следующее может быть быстрее:

SELECT T1.* 
FROM Table_1 T1 JOIN 
    Table_2 T2 
    ON T1.campaignId = T2.campaignId 
WHERE T2.accountId IN ('aid1', 'aid2') 
UNION ALL 
SELECT T1.* 
FROM Table_1 T1 JOIN 
    Table_2 T2 
    ON T1.campaignId = T2.campaignId 
WHERE T2.parentAccountID IN ('aid1', 'aid2') AND 
     T2.accountId NOT IN ('aid1', 'aid2') ; 

Первый может принять во внимание индекс на Table_2(accountId, campaignId) и второй по Table_2(parentAccountID, accountId, campaignId).

+2

Hive ............ –

0

Поскольку это Hive, о котором мы говорим, вам нужно посмотреть не только на традиционные СУБД.

  • уменьшить IO. Используйте сжатый формат столбцов для ваших данных. ORC или паркет. Не RC. Сделайте это сначала, конвертируйте свою таблицу в ORC. Ничто больше не будет вдаваться в затруднительное положение, если данные не будут сжаты и не будут столбчатыми.
  • Выберите подходящую стратегию JOIN для Hive. Этот old 2011 paper по-прежнему имеет значение.
  • Bucketize ваши таблицы
  • Используйте современный механизм выполнения: Tez или Spark.
+0

Спасибо @remus за быстрый ответ. Я читаю данные из потока кафки. данные в формате csv, которые я не могу изменить. Я использую искровой двигатель. –

+0

CSV, вы вставили лист-ручей без весла ... Проверьте, какой вкус JOIN вы получите. Возможно, вы можете использовать соединение Map, если одна из таблиц достаточно мала ('Table_1', возможно?) –

+0

BTW Если данные передаются потоком, вы должны разбить/bucketize и добавить фильтр времени. Это должно значительно снизить необходимость сканирования больших объемов. –

0

Если фильтр T2 достаточно мал, чтобы помещаться в память, попробуйте переписать запрос и переместить фильтр в подзапрос и посмотреть, будет ли соединение выполнено на mapper. Кроме того, вам не нужны столбцы из Т2, левых пола присоединиться может быть использован вместо внутреннего соединения:

set hive.cbo.enable=true; 
set hive.auto.convert.join=true; 

SELECT T1.* 
FROM Table_1 T1 
LEFT SEMI JOIN 
    (select campaignId from Table_2 T2 
     where T2.accountId IN ('aid1', 'aid2') 
      OR T2.parentAccountID IN ('aid1', 'aid2') 
    ) T2 ON T1.campaignId = T2.campaignId 
; 
0

Я бы порекомендовал вам использовать собственные преобразования Spark, а не HiveSQL:

1.read в данные Table_2 (РСУБД) в РДУ & поместить его в кэш Ex:

rddTbl1.map(campaignIdKey, (accountId, parentAccountId)) //filter out before getting into RDD if needed 
rddTbl2.cache() 

2.Now чтения Table_1 поток (Кафка)

//get campaigns of relevant account & parentaccountid 
val rddTbl2_1 = rddTbl2.filter(x => x._2._1.equals("aid1") || x._2._1.equals("aid2") || x._2._2.equals("aid1") || x._2._2.equals("aid2")) 

dstream.foreachRDD{ rddTbl1 => 
    rddTbl1.map(x => x._2.split(",")). 
      map(x => (x(1), x(2)). //campaignId, hash 
      join(rddTbl2_1). 
      map(x => (x._2._1, 1)). //get (hash,1) 
      reduceByKey(_+_). 
      foreach(println) //save it if needed 
} 
0

ОК ..

Вот что я, наконец, сделал.

Я создал хэш таблицы_2.
И затем, используя переменную широковещательной передачи, я передал эти данные каждому узлу.

Это избавит меня от хлопот.

Благодарим вас за ваше время и помощь. Happy coding :)