2012-05-06 3 views
4

Я вижу много вопросов о SO об агрегировании в MongoDB, однако я еще не нашел полного решения для моего.Уникальная агрегация MongoDB по карте уменьшить

Вот пример моих данных:

{ 
    "fruits" : { 
     "apple" : "red", 
     "orange" : "orange", 
     "plum" : "purple" 
    } 
} 
{ 
    "fruits" : { 
     "apple" : "green", 
     "plum" : "purple" 
    } 
} 
{ 
    "fruits" : { 
     "apple" : "red", 
     "orange" : "yellow", 
     "plum" : "purple" 
    } 
} 

Теперь моя цель состоит в том, чтобы определить популярность каждого цвета для каждого плода, так что-то вроде этого будет сбор выхода:

{ 
    "_id" : "apple" 
    "values" : { 
     "red" : 2, 
     "green" : 1 
    } 
} 
{ 
    "_id" : "orange" 
    "values" : { 
     "orange" : 1, 
     "yellow" : 1 
    } 
} 
{ 
    "_id" : "plum" 
    "values" : { 
     "purple" : 3 
    } 
} 

Я пробовал различные функции M/R, и в конце концов они либо не работают, либо они экспоненциально длинны. В контексте примера (фрукты) у меня есть около 1000 различных фруктов и 100 000 цветов по более чем 10 000 000 документов. Мой текущий рабочий M/R это:

map = function() { 
    if (!this.fruits) return; 
    for (var fruit in this.fruits) { 
     emit(fruit, { 
      val_array: [ 
       {value: this.fruits[fruit], count: 1} 
      ] 
     }); 
    } 
}; 

reduce = function(key, values) { 
    var collection = { 
     val_array: [] 
    }; 
    var found = false; 
    values.forEach(function(map_obj) { 
     map_obj.val_array.forEach(function(value_obj) { 
      found = false; 
      // if exists in collection, inc, else add 
      collection.val_array.forEach(function(coll_obj) { 
       if (coll_obj.value == value_obj.value) { 
        // the collection already has this object, increment it 
        coll_obj.count += value_obj.count; 
        found = true; 
        return; 
       } 
      }); 
      if (!found) { 
       // the collection doesn't have this obj yet, push it 
       collection.val_array.push(value_obj); 
      } 
     }); 
    }); 
    return collection; 
}; 

Теперь, это делает работу, и на 100 записей, она занимает лишь второй или так, но время увеличивается нелинейно, поэтому 100M записи будет принимать очень долгое время. Проблема в том, что я выполняю подагрегирование бедных людей в функции уменьшения с массивом collection, поэтому мне нужно перебирать как collection, так и значения из моей функции отображения. Теперь мне просто нужно выяснить, как это сделать эффективно (даже если это требует нескольких сокращений). Любые предложения приветствуются!


EDIT За неимением лучшего места, чтобы разместить его, вот мое решение.
Во-первыхи, я создал файл с именем mr.js:

map = function() { 
    if (!this.fruits) return; 
    var skip_fruits = { 
     'Watermelon':1, 
     'Grapefruit':1, 
     'Tomato':1 // yes, a tomato is a fruit 
    } 
    for (var fruit in this.fruits) { 
     if (skip_fruits[fruit]) continue; 
     var obj = {}; 
     obj[this.fruits[fruit]] = 1; 
     emit(fruit, obj); 
    } 
}; 

reduce = function(key, values) { 
    var out_values = {}; 
    values.forEach(function(v) { 
     for(var k in v) { // iterate values 
      if (!out_values[k]) { 
       out_values[k] = v[k]; // init missing counter 
      } else { 
       out_values[k] += v[k]; 
      } 
     } 
    }); 
    return out_values; 
}; 

var in_coll = "fruit_repo"; 
var out_coll = "fruit_agg_so"; 
var total_docs = db[in_coll].count(); 
var page_size = 100000; 
var pages = Math.floor(total_docs/page_size); 
print('Starting incremental MR job with '+pages+' pages'); 
db[out_coll].drop(); 
for (var i=0; i<pages; i++) { 
    var skip = page_size * i; 
    print("Calculating page limits for "+skip+" - "+(skip+page_size-1)+"..."); 
    var start_date = db[in_coll].find({},{date:1}).sort({date:1}).skip(skip).limit(1)[0].date; 
    var end_date = db[in_coll].find({},{date:1}).sort({date:1}).skip(skip+page_size-1).limit(1)[0].date; 
    var mr_command = { 
     mapreduce: in_coll, 
     map: map, 
     reduce: reduce, 
     out: {reduce: out_coll}, 
     sort: {date: 1}, 
     query: { 
      date: { 
       $gte: start_date, 
       $lt: end_date 
      } 
     }, 
     limit: (page_size - 1) 
    }; 
    print("Running mapreduce for "+skip+" - "+(skip+page_size-1)); 
    db[in_coll].runCommand(mr_command); 
} 

этой файловой перебирает всю мою коллекцию, постепенно карты/уменьшения 100k документов (отсортированных по date, которые должны иметь индекс!) В то время, и сокращение их в единую сборку. Он используется следующим образом: mongo db_name mr.js.

Затем, через пару часов, у меня есть коллекция со всей информацией. Для того, чтобы выяснить, какие фрукты есть большинство цветов, я использую это из Монго оболочки, чтобы распечатать топ-25:

// Show number of number of possible values per key 
var keys = []; 
for (var c = db.fruit_agg_so.find(); c.hasNext();) { 
    var obj = c.next(); 
    if (!obj.value) break; 
    var len=0;for(var l in obj.value){len++;} 
    keys.push({key: obj['_id'], value: len}); 
} 
keys.sort(function(a, b){ 
    if (a.value == b.value) return 0; 
    return (a.value > b.value)? -1: 1; 
}); 
for (var i=0; i<20; i++) { 
    print(keys[i].key+':'+keys[i].value); 
} 

Действительно классная вещь об этом подходе является то, что, так как это инкрементальный, я могу работать с выходом данных во время работы mapreduce.

ответ

8

Кажется, что вам действительно не нужно val_array. Почему бы не использовать простой хэш? Попробуйте это:

map = function() { 
    if (!this.fruits) return; 
    for (var fruit in this.fruits) { 
     emit(fruit, 
      {this.fruits[fruit]: 1}); 
    } 
}; 

reduce = function(key, values) { 
    var colors = {}; 

    values.forEach(function(v) { 
    for(var k in v) { // iterate colors 
     if(!colors[k]) // init missing counter 
     colors[k] = 0 

     color[k] += v[k]; 
    } 
    }); 

    return colors; 
} 
+0

Ничего себе, я действительно думал, что один, не я! Это действительно делает именно то, что я хотел. Я тестировал его с 100, 1000 и 100 000 записями, и он работает около 20 к/сек для каждого набора (видимо, линейный по этим размерам). Сейчас я запускаю полные 10M-записи, и я вижу, что по мере того, как партии отображаемых данных становятся больше, для их сокращения требуется значительно больше времени (объект 'colors' должен расти):' "secs_running": 488, msg ":" m/r: (1/3) испускать фазу 383999/10752083 3% ". – SteveK

+0

Btw, я не мог использовать 'emit (fruit, {this.fruits [fruit]: 1});' потому что ключ был динамически сгенерирован, поэтому я использовал этот JS-хак вместо: 'var obj = {}; obj [this.fruits [fruit]] = 1; emit (фрукты, obj); '. – SteveK

+0

Тогда я предлагаю попробовать частичные задания. То есть обрабатывать документы партиями по 100 тыс. (Или что-то еще), а затем уменьшать их в одном конечном задании. Это может быть сложно реализовать, поэтому, если это одноразовый, я бы не стал беспокоиться. :) –

0

Я сожалею, чтобы сказать вам это, но основа MongoDB MapReduce невероятно медленно, и, вероятно, продолжит быть так для «долгое время» (я бы не стал ожидать улучшение быть по их дорожной карте).

Просто, мой ответ был бы, что я бы не сделать это с Монго-MapReduce, но вместо того, чтобы сосредоточиться на его реализации с помощью нового Aggregation Framework: http://docs.mongodb.org/manual/reference/aggregation/

или бег Hadoop на вершине: http://www.slideshare.net/spf13/mongodb-and-hadoop (хорошее и простое введение)

У меня также были проблемы с медленным использованием MongoDB при использовании реализованной функциональности MapReduce, и мой вывод состоит в том, что даже при выполнении самых простых задач он даже не приближается к два решения выше, когда дело доходит до производительности. Вы можете легко обработать> 1M docs/sec (или даже больше) на товарном оборудовании, используя новую структуру агрегации.