2013-05-04 2 views
2

У меня есть две версии (старые/новые) таблицы базы данных с около 100 000 000 записей. Они находятся в файлах:Mapreduce Table Diff

trx-old 
trx-new 

Структура:

id date amount memo 
1 5/1  100 slacks 
2 5/1  50 wine 

идентификатор простой первичный ключ, остальные поля не являются ключевыми. Я хочу сгенерировать три файла:

trx-removed (ids of records present in trx-old but not in trx-new) 
trx-added (records from trx-new whose ids are not present in trx-old) 
trx-changed (records from trx-new whose non-key values have changed since trx-old) 

Мне нужно сделать эту операцию каждый день в коротком окне партии. И на самом деле, мне нужно сделать это для нескольких таблиц и нескольких схем (создавая три файла для каждого), поэтому фактическое приложение немного более активно. Но я думаю, что этот пример отражает суть проблемы.

Это похоже на очевидное приложение для mapreduce. Я никогда не писал приложение mapreduce, у меня есть следующие вопросы:

  1. Есть ли какое-то приложение EMR, которое уже делает это?
  2. Есть ли очевидное решение Pig или Cascading?
  3. есть ли какой-нибудь другой пример с открытым исходным кодом, который очень близок к этому?

PS Я видел вопрос diff between tables, но решения там не выглядели масштабируемыми.

PPS Вот маленький рубин игрушка, которая демонстрирует алгоритм: Ruby dbdiff

+0

2. Да, решение Pig по крайней мере для добавленных и удаленных частей является явно очевидным «LEFT OUTER JOIN» и «FILTER» на основе того, является ли объединенный столбец «нулевым». Что касается «измененного», то моя лучшая догадка - это внутренний 'JOIN' и фильтр, основанный на том, отличаются ли поля. – TC1

ответ

2

Я думаю, проще всего было бы просто написать свою собственную работу, в основном потому, что вы хотите использовать MultipleOutputs написать три отдельных файлов с одного шага уменьшения, когда типичный редуктор записывает только в один файл. Вам нужно будет использовать MultipleInputs для указания картографа для каждой таблицы.

+0

От взгляда на все решения я задаюсь вопросом, лучше ли использовать консервированную CoGroup и, возможно, некоторую избыточную фильтрацию, или же лучше вручную обработать решение, которое пишет на MultipleOutputs «на лету». –

1

Что приходит мне на ум, что:

Рассмотрим ваши таблицы таковы: элементы

Table_old 
1 other_columns1 
2 other_columns2 
3 other_columns3 

Table_new 
2 other_columns2 
3 other_columns3 
4 other_columns4 

Append table_old "А" и элементы table_new "B".

При объединении обоих файлов и если элемент существует на первый файл, а не во втором файле это удаляется

table_merged 
1a other_columns1 
2a other_columns2 
2b other_columns2 
3a other_columns3 
3b other_columns3 
4a other_columns4 

Из этого файла вы можете сделать ваши операции легко.

Также предположим, что ваш идентификатор - это n цифр, и у вас есть 10 кластеров + 1 мастер. Ваш ключ будет 1-й цифрой id, поэтому вы разделите данные на кластеры равномерно. Вы бы группировали + разбиение на разделы, чтобы ваши данные были отсортированы.

Пример,

table_old 
1...0 data 
1...1 data 
2...2 data 

table_new 
1...0 data 
2...2 data 
3...2 data 

Ваш ключ первая цифра, и вы группирования в соответствии с этой цифрой, и ваш раздел в соответствии с остальной частью идентификатора.Тогда ваши данные пойдут на ваш сайт, так как

worker1 
1...0b data 
1...0a data 
1...1a data 

worker2 
2...2a data 
2...2b data and so on. 

Обратите внимание, что a, b не нужно сортировать.

EDIT Merge будет так:

FileInputFormat.addInputPath(job, new Path("trx-old")); 
FileInputFormat.addInputPath(job, new Path("trx-new")); 

MR получит два входных и будут объединены в два файла,

Для добавляющим части, вы должны создать еще два рабочих мест до Main MR, которые будут иметь только Map. Первый Map будет append "a" каждому элементу в первом списке, а второй будет append "b" элементам второго списка. Третья работа (та, которую мы используем сейчас/основная карта) будет только уменьшать работу, чтобы собрать их. Таким образом, у вас будет Map-Map-Reduce.

Добавления данных может быть сделано как то

//you have key:Text 
new Text(String.valueOf(key.toString()+"a")) 

, но я думаю, что может быть различными способами добавления, некоторые из них могут быть более эффективными в (text hadoop)

Надеется, что это будет полезно,

+0

Это было полезно, и я думаю, что получаю суть. Но вы принимаете некоторые знания MR, которых я не обладаю. В частности, я точно не знаю, как «добавить», а затем «слить», как вы предлагаете на ранней стадии. –

+0

Я редактировал свое сообщение. Я могу сказать, что мой код стал сложнее, если есть более простой готовый код или готовая библиотека, связанная с тем, что вам лучше использовать его. – smttsp

1

Это похоже на идеальную проблему для решения каскадных задач. Вы упоминали, что вы никогда не писали приложение MR, и если намерение - начать быстро (при условии, что вы знакомы с Java), то Cascading - это способ пойти IMHO. Через секунду я коснусь этого.

Можно использовать Pig или Hive, но они не так гибки, если вы хотите выполнить дополнительный анализ этих столбцов или изменить схемы, поскольку вы можете построить свою схему на лету в Cascading, прочитав из заголовков столбцов или из файла сопоставления, созданного для обозначения схемы.

В Cascading вы бы:

  1. Настройка ваших входящих Taps: Нажмите trxOld и Tap trxNew (Они указывают на исходные файлы)
  2. Подключите отводы к Pipes: Труба oldPipe и трубы newPipe
  3. Настройте свой исходящий Taps: Нажмите trxRemoved, нажмите trxAdded и нажмите trxChanged
  4. Создайте свой анализ труб (здесь происходит забава (повреждение))

TRX-Удалено: TRX добавленной

Pipe trxOld = new Pipe ("old-stuff"); 
Pipe trxNew = new Pipe ("new-stuff"); 
//smallest size Pipe on the right in CoGroup 
Pipe oldNnew = new CoGroup("old-N-new", trxOld, new Fields("id1"), 
             trxNew, new Fields("id2"), 
             new OuterJoin()); 

внешнего соединение дает нам NULLS, где идентификаторы, отсутствующие в других трубах (исходные данные), поэтому мы можем использовать FilterNotNull или FilterNull в следующая логика, чтобы получить окончательные каналы, которые мы затем подключим к Tap trxRemoved и Tap trxAdded соответственно.

TRX-изменил

Здесь я бы первым конкатенировать поля, которые вы ищете изменения в использовании FieldJoiner затем использовать ExpressionFilter, чтобы дать нам зомби (потому что они изменились), что-то вроде:

Pipe valueChange = new Pipe("changed"); 
valueChange = new Pipe(oldNnew, new Fields("oldValues", "newValues"), 
      new ExpressionFilter("oldValues.equals(newValues)", String.class), 
      Fields.All); 

Что это такое, он отфильтровывает поля с одинаковым значением и сохраняет различия. Более того, если выражение выше верно, оно избавляется от этой записи. Наконец, подключите ваш канал valueChange к вашему Tap trxChanged, и у вас будет три выхода со всеми данными, которые вы ищете, с кодом, который позволяет вносить некоторые дополнительные анализы в ползание.

+0

Это выглядит довольно близко к реальному решению. CoGroup - это просто волшебное внешнее соединение, которое мне нужно! Это заставляет меня задаться вопросом, что такое производительность CoGroup, и может ли это выиграть от того, что мои таблицы предварительно отсортированы. –

+0

Производительность соединений в MapReduce всегда должна быть медленной (технический термин). Рамка не способна ускорить объединение (просто спросите улей PIG), и вообще Hadoop следует рассматривать как 18-х колесный погрузчик, тянущий тонны веса медленно, но лучше, чем использование 1 мужественного автомобиля. Теперь, к вашему второму вопросу о предварительных данных, я не знаю о его влиянии на оптимизацию. Я знаю, что CoGroup сортирует групповой ключ по их естественному порядку, который говорит мне, что он встроен. Я вынужден полагать, что presorted будет работать быстрее, чем несортировано в трубе GroupBy. – Engineiro

1

Как предложил @ChrisGerken, вам нужно будет использовать MultipleOutputs и MultipleInputs, чтобы генерировать несколько выходных файлов и связывать пользовательские сопоставления с каждым типом входного файла (старым/новым).

Преобразователь выход будет:

  • ключ: первичный ключ (идентификатор)
  • значение: запись из входного файла с дополнительным флагом (новый/старый в зависимости от входа)

редуктор будет перебирать все записи R для каждой клавиши и выхода:

  • в удаленный файл: если существует только запись с флагом old.
  • в добавленный файл: если существует только запись с флагом new.
  • изменение файла: если записи в R отличается.

Поскольку этот алгоритм масштабируется с количеством редукторов, вам, скорее всего, понадобится второе задание, которое объединит результаты в один файл для окончательного вывода.

+0

Нет ответа, связанного с HOW ?. Вы только что сказали, что запись не существует во втором файле, считая ее удаленной. Это существует в вопросе. – smttsp

+0

@smttsp: Надеюсь, это прояснит ситуацию. Редуктор получит следующий вход и отметит его как удаленный: '(1, {old1})', добавил: '(2, {new2})', изменено: '(3, {old3, new3})'. – harpun

+0

Интересно, как решение @ Engineiro (с использованием CoGroup) сравнится с этим решением по производительности. –

 Смежные вопросы

  • Нет связанных вопросов^_^