Я вижу много вопросов о SO об агрегировании в MongoDB, однако я еще не нашел полного решения для моего.Уникальная агрегация MongoDB по карте уменьшить
Вот пример моих данных:
{
"fruits" : {
"apple" : "red",
"orange" : "orange",
"plum" : "purple"
}
}
{
"fruits" : {
"apple" : "green",
"plum" : "purple"
}
}
{
"fruits" : {
"apple" : "red",
"orange" : "yellow",
"plum" : "purple"
}
}
Теперь моя цель состоит в том, чтобы определить популярность каждого цвета для каждого плода, так что-то вроде этого будет сбор выхода:
{
"_id" : "apple"
"values" : {
"red" : 2,
"green" : 1
}
}
{
"_id" : "orange"
"values" : {
"orange" : 1,
"yellow" : 1
}
}
{
"_id" : "plum"
"values" : {
"purple" : 3
}
}
Я пробовал различные функции M/R, и в конце концов они либо не работают, либо они экспоненциально длинны. В контексте примера (фрукты) у меня есть около 1000 различных фруктов и 100 000 цветов по более чем 10 000 000 документов. Мой текущий рабочий M/R это:
map = function() {
if (!this.fruits) return;
for (var fruit in this.fruits) {
emit(fruit, {
val_array: [
{value: this.fruits[fruit], count: 1}
]
});
}
};
reduce = function(key, values) {
var collection = {
val_array: []
};
var found = false;
values.forEach(function(map_obj) {
map_obj.val_array.forEach(function(value_obj) {
found = false;
// if exists in collection, inc, else add
collection.val_array.forEach(function(coll_obj) {
if (coll_obj.value == value_obj.value) {
// the collection already has this object, increment it
coll_obj.count += value_obj.count;
found = true;
return;
}
});
if (!found) {
// the collection doesn't have this obj yet, push it
collection.val_array.push(value_obj);
}
});
});
return collection;
};
Теперь, это делает работу, и на 100 записей, она занимает лишь второй или так, но время увеличивается нелинейно, поэтому 100M записи будет принимать очень долгое время. Проблема в том, что я выполняю подагрегирование бедных людей в функции уменьшения с массивом collection
, поэтому мне нужно перебирать как collection
, так и значения из моей функции отображения. Теперь мне просто нужно выяснить, как это сделать эффективно (даже если это требует нескольких сокращений). Любые предложения приветствуются!
EDIT За неимением лучшего места, чтобы разместить его, вот мое решение.
Во-первыхи, я создал файл с именем
mr.js
:
map = function() {
if (!this.fruits) return;
var skip_fruits = {
'Watermelon':1,
'Grapefruit':1,
'Tomato':1 // yes, a tomato is a fruit
}
for (var fruit in this.fruits) {
if (skip_fruits[fruit]) continue;
var obj = {};
obj[this.fruits[fruit]] = 1;
emit(fruit, obj);
}
};
reduce = function(key, values) {
var out_values = {};
values.forEach(function(v) {
for(var k in v) { // iterate values
if (!out_values[k]) {
out_values[k] = v[k]; // init missing counter
} else {
out_values[k] += v[k];
}
}
});
return out_values;
};
var in_coll = "fruit_repo";
var out_coll = "fruit_agg_so";
var total_docs = db[in_coll].count();
var page_size = 100000;
var pages = Math.floor(total_docs/page_size);
print('Starting incremental MR job with '+pages+' pages');
db[out_coll].drop();
for (var i=0; i<pages; i++) {
var skip = page_size * i;
print("Calculating page limits for "+skip+" - "+(skip+page_size-1)+"...");
var start_date = db[in_coll].find({},{date:1}).sort({date:1}).skip(skip).limit(1)[0].date;
var end_date = db[in_coll].find({},{date:1}).sort({date:1}).skip(skip+page_size-1).limit(1)[0].date;
var mr_command = {
mapreduce: in_coll,
map: map,
reduce: reduce,
out: {reduce: out_coll},
sort: {date: 1},
query: {
date: {
$gte: start_date,
$lt: end_date
}
},
limit: (page_size - 1)
};
print("Running mapreduce for "+skip+" - "+(skip+page_size-1));
db[in_coll].runCommand(mr_command);
}
этой файловой перебирает всю мою коллекцию, постепенно карты/уменьшения 100k документов (отсортированных по date
, которые должны иметь индекс!) В то время, и сокращение их в единую сборку. Он используется следующим образом: mongo db_name mr.js
.
Затем, через пару часов, у меня есть коллекция со всей информацией. Для того, чтобы выяснить, какие фрукты есть большинство цветов, я использую это из Монго оболочки, чтобы распечатать топ-25:
// Show number of number of possible values per key
var keys = [];
for (var c = db.fruit_agg_so.find(); c.hasNext();) {
var obj = c.next();
if (!obj.value) break;
var len=0;for(var l in obj.value){len++;}
keys.push({key: obj['_id'], value: len});
}
keys.sort(function(a, b){
if (a.value == b.value) return 0;
return (a.value > b.value)? -1: 1;
});
for (var i=0; i<20; i++) {
print(keys[i].key+':'+keys[i].value);
}
Действительно классная вещь об этом подходе является то, что, так как это инкрементальный, я могу работать с выходом данных во время работы mapreduce.
Ничего себе, я действительно думал, что один, не я! Это действительно делает именно то, что я хотел. Я тестировал его с 100, 1000 и 100 000 записями, и он работает около 20 к/сек для каждого набора (видимо, линейный по этим размерам). Сейчас я запускаю полные 10M-записи, и я вижу, что по мере того, как партии отображаемых данных становятся больше, для их сокращения требуется значительно больше времени (объект 'colors' должен расти):' "secs_running": 488, msg ":" m/r: (1/3) испускать фазу 383999/10752083 3% ". – SteveK
Btw, я не мог использовать 'emit (fruit, {this.fruits [fruit]: 1});' потому что ключ был динамически сгенерирован, поэтому я использовал этот JS-хак вместо: 'var obj = {}; obj [this.fruits [fruit]] = 1; emit (фрукты, obj); '. – SteveK
Тогда я предлагаю попробовать частичные задания. То есть обрабатывать документы партиями по 100 тыс. (Или что-то еще), а затем уменьшать их в одном конечном задании. Это может быть сложно реализовать, поэтому, если это одноразовый, я бы не стал беспокоиться. :) –