Групповая агрегация Mongodb $group, ограничить длину массива

Я хочу сгруппировать все документы в соответствии с полем, но ограничить количество сгруппированных документов для каждого значения.

Каждое сообщение имеет идентификатор беседы. Мне нужно получить 10 или меньшее количество сообщений для каждого идентификатора talk_ID.

Я могу группироваться в соответствии со следующей командой, но не могу понять, как ограничить количество сгруппированных документов, кроме нарезки результатов Message.aggregate({'$group':{_id:'$conversation_ID',msgs:{'$push':{msgid:'$_id'}}}})

Как ограничить длину массива msgs для каждого параметра talk_ID до 10?

Ответы

Ответ 1

Современные

От MongoDB 3.6 существует "новый" подход к этому, используя $lookup для выполнения "самостоятельного объединения" во многом так же, как показано на рисунке ниже.

Так как в этой версии вы можете указать аргумент "pipeline" $lookup в качестве источника для "join", это по существу означает, что вы можете использовать $match и $limit для сбора и "ограничения" записей для массива:

db.messages.aggregate([
  { "$group": { "_id": "$conversation_ID" } },
  { "$lookup": {
    "from": "messages",
    "let": { "conversation": "$_id" },
    "pipeline": [
      { "$match": { "$expr": { "$eq": [ "$conversation_ID", "$$conversation" ] } }},
      { "$limit": 10 },
      { "$project": { "_id": 1 } }
    ],
    "as": "msgs"
  }}
])

Вы можете дополнительно добавить дополнительную проекцию после $lookup, чтобы сделать элементы массива просто значениями, а не документами с _id, но основной результат можно сделать, просто выполнив описанное выше.

По-прежнему существует выдающийся SERVER-9277, который фактически запрашивает "ограничение на нажатие" напрямую, но используя $lookup таким образом является жизнеспособной альтернативой в промежуточный период.

ПРИМЕЧАНИЕ. Также существует $slice, который был введен после написания исходного ответа и упоминается в "выдающаяся проблема JIRA" в исходном контенте. Хотя вы можете получить тот же результат с небольшими результирующими наборами, он включает в себя все еще "толкание всего" в массив, а затем позднее ограничение выхода конечного массива на нужную длину.

Итак, главное отличие и почему это вообще непрактично для $slice для больших результатов. Но, конечно, его можно использовать поочередно в тех случаях, когда оно есть.

Есть несколько дополнительных сведений о значениях группы mongodb по нескольким полям об альтернативном использовании.


Оригинальные

Как было сказано ранее, это не невозможно, а, безусловно, ужасная проблема.

На самом деле, если ваша основная проблема заключается в том, что ваши результирующие массивы будут исключительно большими, тогда вам лучше всего представить для каждого отдельного "chat_ID" как индивидуальный запрос, а затем объединить результаты. В очень синтаксисе MongoDB 2.6, который может нуждаться в некоторой настройке в зависимости от того, какова ваша реализация на самом деле:

var results = [];
db.messages.aggregate([
    { "$group": {
        "_id": "$conversation_ID"
    }}
]).forEach(function(doc) {
    db.messages.aggregate([
        { "$match": { "conversation_ID": doc._id } },
        { "$limit": 10 },
        { "$group": {
            "_id": "$conversation_ID",
            "msgs": { "$push": "$_id" }
        }}
    ]).forEach(function(res) {
        results.push( res );
    });
});

Но все зависит от того, чего вы пытаетесь избежать. Итак, к реальному ответу:


Первая проблема здесь заключается в том, что нет функции "ограничить" количество элементов, которые "толкаются" в массив. Это, безусловно, то, что нам бы хотелось, но функциональность в настоящее время не существует.

Вторая проблема заключается в том, что даже при нажатии всех элементов в массив вы не можете использовать $slice, или любой аналогичный оператор в конвейере агрегации. Таким образом, нет никакого способа получить только "топ-10" результатов из созданного массива с простой операцией.

Но вы можете фактически создать набор операций для эффективного "среза" на ваших границах группировки. Это довольно привлекательно, и, например, здесь я уменьшу элементы массива, "нарезанные" до "шестерки". Основная причина здесь состоит в том, чтобы продемонстрировать процесс и показать, как это сделать, не разрушая массивы, которые не содержат общее количество, которое вы хотите "нарезать".

Учитывая выборку документов:

{ "_id" : 1, "conversation_ID" : 123 }
{ "_id" : 2, "conversation_ID" : 123 }
{ "_id" : 3, "conversation_ID" : 123 }
{ "_id" : 4, "conversation_ID" : 123 }
{ "_id" : 5, "conversation_ID" : 123 }
{ "_id" : 6, "conversation_ID" : 123 }
{ "_id" : 7, "conversation_ID" : 123 }
{ "_id" : 8, "conversation_ID" : 123 }
{ "_id" : 9, "conversation_ID" : 123 }
{ "_id" : 10, "conversation_ID" : 123 }
{ "_id" : 11, "conversation_ID" : 123 }
{ "_id" : 12, "conversation_ID" : 456 }
{ "_id" : 13, "conversation_ID" : 456 }
{ "_id" : 14, "conversation_ID" : 456 }
{ "_id" : 15, "conversation_ID" : 456 }
{ "_id" : 16, "conversation_ID" : 456 }

Вы можете видеть, что при группировке по вашим условиям вы получите один массив из десяти элементов, а другой с "пятью". То, что вы хотите сделать здесь, уменьшает как до верхнего "шестерки" без "уничтожения" массива, который будет соответствовать только "пяти" элементам.

И следующий запрос:

db.messages.aggregate([
    { "$group": {
        "_id": "$conversation_ID",
        "first": { "$first": "$_id" },
        "msgs": { "$push": "$_id" },
    }},
    { "$unwind": "$msgs" },
    { "$project": {
        "msgs": 1,
        "first": 1,
        "seen": { "$eq": [ "$first", "$msgs" ] }
    }},
    { "$sort": { "seen": 1 }},
    { "$group": {
        "_id": "$_id",
        "msgs": { 
            "$push": {
               "$cond": [ { "$not": "$seen" }, "$msgs", false ]
            }
        },
        "first": { "$first": "$first" },
        "second": { "$first": "$msgs" }
    }},
    { "$unwind": "$msgs" },
    { "$project": {
        "msgs": 1,
        "first": 1,
        "second": 1,
        "seen": { "$eq": [ "$second", "$msgs" ] }
    }},
    { "$sort": { "seen": 1 }},
    { "$group": {
        "_id": "$_id",
        "msgs": { 
            "$push": {
               "$cond": [ { "$not": "$seen" }, "$msgs", false ]
            }
        },
        "first": { "$first": "$first" },
        "second": { "$first": "$second" },
        "third": { "$first": "$msgs" }
    }},
    { "$unwind": "$msgs" },
    { "$project": {
        "msgs": 1,
        "first": 1,
        "second": 1,
        "third": 1,
        "seen": { "$eq": [ "$third", "$msgs" ] },
    }},
    { "$sort": { "seen": 1 }},
    { "$group": {
        "_id": "$_id",
        "msgs": { 
            "$push": {
               "$cond": [ { "$not": "$seen" }, "$msgs", false ]
            }
        },
        "first": { "$first": "$first" },
        "second": { "$first": "$second" },
        "third": { "$first": "$third" },
        "forth": { "$first": "$msgs" }
    }},
    { "$unwind": "$msgs" },
    { "$project": {
        "msgs": 1,
        "first": 1,
        "second": 1,
        "third": 1,
        "forth": 1,
        "seen": { "$eq": [ "$forth", "$msgs" ] }
    }},
    { "$sort": { "seen": 1 }},
    { "$group": {
        "_id": "$_id",
        "msgs": { 
            "$push": {
               "$cond": [ { "$not": "$seen" }, "$msgs", false ]
            }
        },
        "first": { "$first": "$first" },
        "second": { "$first": "$second" },
        "third": { "$first": "$third" },
        "forth": { "$first": "$forth" },
        "fifth": { "$first": "$msgs" }
    }},
    { "$unwind": "$msgs" },
    { "$project": {
        "msgs": 1,
        "first": 1,
        "second": 1,
        "third": 1,
        "forth": 1,
        "fifth": 1,
        "seen": { "$eq": [ "$fifth", "$msgs" ] }
    }},
    { "$sort": { "seen": 1 }},
    { "$group": {
        "_id": "$_id",
        "msgs": { 
            "$push": {
               "$cond": [ { "$not": "$seen" }, "$msgs", false ]
            }
        },
        "first": { "$first": "$first" },
        "second": { "$first": "$second" },
        "third": { "$first": "$third" },
        "forth": { "$first": "$forth" },
        "fifth": { "$first": "$fifth" },
        "sixth": { "$first": "$msgs" },
    }},
    { "$project": {
         "first": 1,
         "second": 1,
         "third": 1,
         "forth": 1,
         "fifth": 1,
         "sixth": 1,
         "pos": { "$const": [ 1,2,3,4,5,6 ] }
    }},
    { "$unwind": "$pos" },
    { "$group": {
        "_id": "$_id",
        "msgs": {
            "$push": {
                "$cond": [
                    { "$eq": [ "$pos", 1 ] },
                    "$first",
                    { "$cond": [
                        { "$eq": [ "$pos", 2 ] },
                        "$second",
                        { "$cond": [
                            { "$eq": [ "$pos", 3 ] },
                            "$third",
                            { "$cond": [
                                { "$eq": [ "$pos", 4 ] },
                                "$forth",
                                { "$cond": [
                                    { "$eq": [ "$pos", 5 ] },
                                    "$fifth",
                                    { "$cond": [
                                        { "$eq": [ "$pos", 6 ] },
                                        "$sixth",
                                        false
                                    ]}
                                ]}
                            ]}
                        ]}
                    ]}
                ]
            }
        }
    }},
    { "$unwind": "$msgs" },
    { "$match": { "msgs": { "$ne": false } }},
    { "$group": {
        "_id": "$_id",
        "msgs": { "$push": "$msgs" }
    }}
])

Вы получаете максимальные результаты в массиве, до шести записей:

{ "_id" : 123, "msgs" : [ 1, 2, 3, 4, 5, 6 ] }
{ "_id" : 456, "msgs" : [ 12, 13, 14, 15 ] }

Как вы можете видеть здесь, весело.

После того, как вы изначально сгруппировали, вы в основном хотите "выскочить" из $first значение стек для результатов массива. Чтобы упростить этот процесс, мы фактически делаем это в начальной операции. Таким образом, процесс становится следующим:

  • $unwind массив
  • Сравните с значениями, уже встречающимися с $eq соответствие равенства
  • $sortрезультаты для "float" false невидимых значений вверху (это все еще сохраняется)
  • $group назад и "поп" $first значение unseen как следующий элемент в стеке. Также используется оператор $cond для замены "видимых" значений в стеке массива с помощью false на помощь в оценке.

Заключительное действие с $cond заключается в том, чтобы будущие итерации не просто добавляли последнее значение массива снова и снова, где счетчик "срезов" больше, чем члены массива.

Весь этот процесс необходимо повторить для множества элементов, которые вы хотите "нарезать". Поскольку мы уже нашли "первый" элемент в начальной группировке, это означает, что n-1 итерации для желаемого результата среза.

Заключительные шаги на самом деле являются лишь дополнительной иллюстрацией преобразования всего в массивы для результата, как показано на рисунке. Так что на самом деле просто условно нажимайте элементы или false назад по их совпадающей позиции и, наконец, "отфильтровывайте" все значения false, чтобы конечные массивы имели "шесть" и "пять" элементов соответственно.

Таким образом, для этого не существует стандартного оператора, и вы не можете просто "ограничить" нажатие на 5 или 10 или любые другие элементы в массиве. Но если вам действительно нужно это сделать, тогда это ваш лучший подход.


Вы могли бы подойти к этому с помощью mapReduce и отказаться от структуры агрегации. Подход, который я бы принял (в разумных пределах), состоял бы в том, чтобы эффективно иметь хэш-карту в памяти на сервере и накапливать массивы, используя фрагмент JavaScript для "ограничения" результатов:

db.messages.mapReduce(
    function () {

        if ( !stash.hasOwnProperty(this.conversation_ID) ) {
            stash[this.conversation_ID] = [];
        }

        if ( stash[this.conversation_ID.length < maxLen ) {
            stash[this.conversation_ID].push( this._id );
            emit( this.conversation_ID, 1 );
        }

    },
    function(key,values) {
        return 1;   // really just want to keep the keys
    },
    { 
        "scope": { "stash": {}, "maxLen": 10 },
        "finalize": function(key,value) {
            return { "msgs": stash[key] };                
        },
        "out": { "inline": 1 }
    }
)

Так что просто в основном создается объект "in-memory", соответствующий испускаемым "ключам" с массивом, никогда не превышающим максимальный размер, который вы хотите извлечь из ваших результатов. Кроме того, это даже не мешает "испускать" элемент, когда выполняется максимальный стек.

Уменьшенная часть фактически ничего не делает, кроме как по существу, просто сводится к "ключу" и одному значению. Таким образом, на всякий случай, когда наш редуктор не вызывался, как это было бы верно, если бы существовало только 1 значение для ключа, функция finalize заботится о том, чтобы сопоставить клавиши "stash" с конечным результатом.

Эффективность этого зависит от размера вывода, и оценка JavaScript, конечно же, не выполняется быстро, но, возможно, быстрее, чем обработка больших массивов в конвейере.


Прогоните JIRA issues, чтобы на самом деле иметь оператор "среза" или даже "лимит" на "$push" и "$addToSet", что было бы удобно. Лично надеясь, что по крайней мере некоторые изменения могут быть внесены в оператор $map, чтобы разоблачить "текущий индекс", значение при обработке. Это позволит эффективно разрезать и другие операции.

Действительно, вы захотите закодировать это, чтобы "генерировать" все требуемые итерации. Если ответ здесь получает достаточно любви и/или другого времени в ожидании, что у меня есть в tuits, я мог бы добавить код, чтобы продемонстрировать, как это сделать. Это уже достаточно длинный ответ.


Код для создания конвейера:

var key = "$conversation_ID";
var val = "$_id";
var maxLen = 10;

var stack = [];
var pipe = [];
var fproj = { "$project": { "pos": { "$const": []  } } };

for ( var x = 1; x <= maxLen; x++ ) {

    fproj["$project"][""+x] = 1;
    fproj["$project"]["pos"]["$const"].push( x );

    var rec = {
        "$cond": [ { "$eq": [ "$pos", x ] }, "$"+x ]
    };
    if ( stack.length == 0 ) {
        rec["$cond"].push( false );
    } else {
        lval = stack.pop();
        rec["$cond"].push( lval );
    }

    stack.push( rec );

    if ( x == 1) {
        pipe.push({ "$group": {
           "_id": key,
           "1": { "$first": val },
           "msgs": { "$push": val }
        }});
    } else {
        pipe.push({ "$unwind": "$msgs" });
        var proj = {
            "$project": {
                "msgs": 1
            }
        };

        proj["$project"]["seen"] = { "$eq": [ "$"+(x-1), "$msgs" ] };

        var grp = {
            "$group": {
                "_id": "$_id",
                "msgs": {
                    "$push": {
                        "$cond": [ { "$not": "$seen" }, "$msgs", false ]
                    }
                }
            }
        };

        for ( n=x; n >= 1; n-- ) {
            if ( n != x ) 
                proj["$project"][""+n] = 1;
            grp["$group"][""+n] = ( n == x ) ? { "$first": "$msgs" } : { "$first": "$"+n };
        }

        pipe.push( proj );
        pipe.push({ "$sort": { "seen": 1 } });
        pipe.push(grp);
    }
}

pipe.push(fproj);
pipe.push({ "$unwind": "$pos" });
pipe.push({
    "$group": {
        "_id": "$_id",
        "msgs": { "$push": stack[0] }
    }
});
pipe.push({ "$unwind": "$msgs" });
pipe.push({ "$match": { "msgs": { "$ne": false } }});
pipe.push({
    "$group": {
        "_id": "$_id",
        "msgs": { "$push": "$msgs" }
    }
}); 

Создает базовый итеративный подход до maxLen с шагом от $unwind до $group. Также в нем содержатся подробные сведения о необходимых финальных прогнозах и "вложенном" условном утверждении. Последнее, в основном, относится к этому вопросу:

Предоставляет ли заказ гарантии MongoDB $?

Ответ 2

Оператор $slice не является агрегирующим оператором, поэтому вы не можете сделать это (как я предложил в этом ответе перед редактированием):

db.messages.aggregate([
   { $group : {_id:'$conversation_ID',msgs: { $push: { msgid:'$_id' }}}},
   { $project : { _id : 1, msgs : { $slice : 10 }}}]);

Ответ на Neil очень подробный, но вы можете использовать несколько иной подход (если он подходит для вашего случая использования). Вы можете агрегировать свои результаты и выводить их в новую коллекцию:

db.messages.aggregate([
   { $group : {_id:'$conversation_ID',msgs: { $push: { msgid:'$_id' }}}},
   { $out : "msgs_agg" }
]);

Оператор $out будет записывать результаты агрегации в новую коллекцию. Затем вы можете использовать обычный проект запроса поиска для своих результатов с помощью оператора $slice:

db.msgs_agg.find({}, { msgs : { $slice : 10 }});

Для этих тестовых документов:

> db.messages.find().pretty();
{ "_id" : 1, "conversation_ID" : 123 }
{ "_id" : 2, "conversation_ID" : 123 }
{ "_id" : 3, "conversation_ID" : 123 }
{ "_id" : 4, "conversation_ID" : 123 }
{ "_id" : 5, "conversation_ID" : 123 }
{ "_id" : 7, "conversation_ID" : 1234 }
{ "_id" : 8, "conversation_ID" : 1234 }
{ "_id" : 9, "conversation_ID" : 1234 }

Результат будет:

> db.msgs_agg.find({}, { msgs : { $slice : 10 }});
{ "_id" : 1234, "msgs" : [ { "msgid" : 7 }, { "msgid" : 8 }, { "msgid" : 9 } ] }
{ "_id" : 123, "msgs" : [ { "msgid" : 1 }, { "msgid" : 2 }, { "msgid" : 3 }, 
                          { "msgid" : 4 }, { "msgid" : 5 } ] }

Edit

Я предполагаю, что это будет означать дублирование всей коллекции сообщений. Разве это не избыток?

Хорошо, очевидно, что этот подход не будет масштабироваться с огромными коллекциями. Но, поскольку вы планируете использовать большие агрегатные конвейеры или большие задания с уменьшением размера карты, вы, вероятно, не будете использовать это для запросов "в реальном времени".

Есть много недостатков такого подхода: ограничение BSON на 16 МБ, если вы создаете огромные документы с агрегацией, тратите дисковое пространство/память с дублированием, увеличиваете диск IO...

Плюсы этого подхода: его просто реализовать и, следовательно, легко изменить. Если ваша коллекция редко обновляется, вы можете использовать эту "вне" коллекцию, такую ​​как кеш. Таким образом, вам не нужно было бы выполнять операцию агрегации несколько раз, и тогда вы могли бы даже поддерживать клиентские запросы "в реальном времени" в коллекции "вне". Чтобы обновить свои данные, вы можете периодически выполнять агрегацию (например, в фоновом задании, которое выполняется в ночное время).

Как было сказано в комментариях, это непростая задача, и нет идеального решения для этого (пока!). Я показал вам другой подход, который вы можете использовать, это зависит от вас, чтобы оценить и решить, что наиболее подходит для вашего случая использования.