Mongodb перемещает документы из одной коллекции в другую коллекцию
Как документы перемещаются из одной коллекции в другую коллекцию в MongoDB? Например: у меня есть много документов в коллекции A, и я хочу переместить все 1 месяц старые документы в коллекцию B (эти 1 месяц старше документы не должны быть в коллекции A).
Используя агрегацию, мы можем сделать копию. Но я пытаюсь сделать перемещение документов.
Какой метод можно использовать для перемещения документов?
Ответы
Ответ 1
Обновление
Этот ответ by @jasongarber является более безопасным подходом и должен использоваться вместо моего.
При условии, что я получил вас правильно, и вы хотите переместить все документы старше 1 месяца, и вы используете mongoDB 2.6, нет причин не использовать массовые операции, которые являются наиболее эффективным способом выполнения нескольких операций, о которых я знаю
> var bulkInsert = db.target.initializeUnorderedBulkOp()
> var bulkRemove = db.source.initializeUnorderedBulkOp()
> var date = new Date()
> date.setMonth(date.getMonth() -1)
> db.source.find({"yourDateField":{$lt: date}}).forEach(
function(doc){
bulkInsert.insert(doc);
bulkRemove.find({_id:doc._id}).removeOne();
}
)
> bulkInsert.execute()
> bulkRemove.execute()
Это должно быть довольно быстро, и это имеет то преимущество, что если что-то пойдет не так во время объемной вставки, исходные данные все еще существуют.
Edit
Чтобы предотвратить чрезмерное использование памяти, вы можете выполнить массовую операцию на каждом обработанном документе x
:
> var bulkInsert = db.target.initializeUnorderedBulkOp()
> var bulkRemove = db.source.initializeUnorderedBulkOp()
> var x = 10000
> var counter = 0
> var date = new Date()
> date.setMonth(date.getMonth() -1)
> db.source.find({"yourDateField":{$lt: date}}).forEach(
function(doc){
bulkInsert.insert(doc);
bulkRemove.find({_id:doc._id}).removeOne();
counter ++
if( counter % x == 0){
bulkInsert.execute()
bulkRemove.execute()
bulkInsert = db.target.initializeUnorderedBulkOp()
bulkRemove = db.source.initializeUnorderedBulkOp()
}
}
)
> bulkInsert.execute()
> bulkRemove.execute()
Ответ 2
Вставьте и удалите:
var documentsToMove = db.collectionA.find({});
documentsToMove.forEach(function(doc) {
db.collectionB.insert(doc);
db.collectionA.remove(doc);
}
Ответ 3
Это повторение @Markus W Mahlberg
Возвращение благосклонности - как функция
function moveDocuments(sourceCollection,targetCollection,filter) {
var bulkInsert = targetCollection.initializeUnorderedBulkOp();
var bulkRemove = sourceCollection.initializeUnorderedBulkOp();
sourceCollection.find(filter)
.forEach(function(doc) {
bulkInsert.insert(doc);
bulkRemove.find({_id:doc._id}).removeOne();
}
)
bulkInsert.execute();
bulkRemove.execute();
}
В примере используется
var x = {dsid:{$exists: true}};
moveDocuments(db.pictures,db.artifacts,x)
чтобы переместить все документы, содержащие dsid верхнего уровня из изображений в коллекцию артефактов.
Ответ 4
Основные операции @markus-w-mahlberg показали (и @mark-mullin уточнены) эффективны, но небезопасны, как написано. Если bulkInsert завершится с ошибкой, функция bulkRemove будет продолжена. Чтобы убедиться, что вы не теряете записи при перемещении, используйте это вместо:
function insertBatch(collection, documents) {
var bulkInsert = collection.initializeUnorderedBulkOp();
var insertedIds = [];
var id;
documents.forEach(function(doc) {
id = doc._id;
// Insert without raising an error for duplicates
bulkInsert.find({_id: id}).upsert().replaceOne(doc);
insertedIds.push(id);
});
bulkInsert.execute();
return insertedIds;
}
function deleteBatch(collection, documents) {
var bulkRemove = collection.initializeUnorderedBulkOp();
documents.forEach(function(doc) {
bulkRemove.find({_id: doc._id}).removeOne();
});
bulkRemove.execute();
}
function moveDocuments(sourceCollection, targetCollection, filter, batchSize) {
print("Moving " + sourceCollection.find(filter).count() + " documents from " + sourceCollection + " to " + targetCollection);
var count;
while ((count = sourceCollection.find(filter).count()) > 0) {
print(count + " documents remaining");
sourceDocs = sourceCollection.find(filter).limit(batchSize);
idsOfCopiedDocs = insertBatch(targetCollection, sourceDocs);
targetDocs = targetCollection.find({_id: {$in: idsOfCopiedDocs}});
deleteBatch(sourceCollection, targetDocs);
}
print("Done!")
}
Ответ 5
Возможно, с точки зрения производительности лучше удалить много документов с помощью одной команды (особенно если у вас есть индексы для части запроса), а не удалять их по очереди.
Например:
db.source.find({$gte: start, $lt: end}).forEach(function(doc){
db.target.insert(doc);
});
db.source.remove({$gte: start, $lt: end});
Ответ 6
Из MongoDB 3.0 вверх вы можете использовать команду copyTo со следующим синтаксисом:
db.source_collection.copyTo("target_collection")
Затем вы можете использовать команду drop, чтобы удалить старую коллекцию:
db.source_collection.drop()
Ответ 7
$out используется для создания новой коллекции с данными, поэтому используйте $out
db.oldCollection.aggregate([{$out : "newCollection"}])
затем используйте drop
db.oldCollection.drop()
Ответ 8
вы можете использовать запрос диапазона для получения данных из sourceCollection и держать данные курсора в переменной и петле на нем и вставлять в целевую коллекцию:
var doc = db.sourceCollection.find({
"Timestamp":{
$gte:ISODate("2014-09-01T00:00:00Z"),
$lt:ISODate("2014-10-01T00:00:00Z")
}
});
doc.forEach(function(doc){
db.targetCollection.insert(doc);
})
Надеюсь, что это поможет!
Ответ 9
Мне нравится ответ от @markus-w-mahlberg, однако время от времени я видел необходимость держать его немного проще для людей. Таким образом, у меня есть несколько функций, которые ниже. Естественно, вы могли бы обернуть все это с помощью массовых операторов, но этот код работает с новыми и старыми системами Mongo одинаково.
function parseNS(ns){
//Expects we are forcing people to not violate the rules and not doing "foodb.foocollection.month.day.year" if they do they need to use an array.
if (ns instanceof Array){
database = ns[0];
collection = ns[1];
}
else{
tNS = ns.split(".");
if (tNS.length > 2){
print('ERROR: NS had more than 1 period in it, please pass as an [ "dbname","coll.name.with.dots"] !');
return false;
}
database = tNS[0];
collection = tNS[1];
}
return {database: database,collection: collection};
}
function insertFromCollection( sourceNS, destNS, query, batchSize, pauseMS){
//Parse and check namespaces
srcNS = parseNS(sourceNS);
destNS = parseNS(destNS);
if ( srcNS == false || destNS == false){return false;}
batchBucket = new Array();
totalToProcess = db.getDB(srcNS.database).getCollection(srcNS.collection).find(query,{_id:1}).count();
currentCount = 0;
print("Processed "+currentCount+"/"+totalToProcess+"...");
db.getDB(srcNS.database).getCollection(srcNS.collection).find(query).addOption(DBQuery.Option.noTimeout).forEach(function(doc){
batchBucket.push(doc);
if ( batchBucket.length > batchSize){
db.getDB(destNS.database).getCollection(destNS.collection)insert(batchBucket);
currentCount += batchBucket.length;
batchBucket = [];
sleep (pauseMS);
print("Processed "+currentCount+"/"+totalToProcess+"...");
}
}
print("Completed");
}
/** Example Usage:
insertFromCollection("foo.bar","foo2.bar",{"type":"archive"},1000,20);
Очевидно, вы можете добавить db.getSiblingDB(srcNS.database).getCollection(srcNS.collection).remove(query,true)
Если вы захотите также удалить записи после их копирования в новое место. Код можно легко создать таким образом, чтобы сделать его перезагруженным.
Ответ 10
Я планировал архивировать 1000 записей за раз, используя методы bulkinsert и bulkdelete для pymongo.
Для источника и цели
-
создать объекты mongodb для подключения к базе данных.
-
создать экземпляр объемных объектов. Примечание. Я создал резервную копию больших объектов. Это поможет мне отложить вставку или удаление при возникновении ошибки.
Пример:
Для источника
// replace this with mongodb object creation logic
source_db_obj = db_help.create_db_obj(source_db, source_col)
source_bulk = source_db_obj.initialize_ordered_bulk_op()
source_bulk_bak = source_db_obj.initialize_ordered_bulk_op()
Для целей
// replace this with mogodb object creation logic
target_db_obj = db_help.create_db_obj(target_db, target_col)
target_bulk = target_db_obj.initialize_ordered_bulk_op()
target_bulk_bak = target_db_obj.initialize_ordered_bulk_op()
-
Получить исходные записи, соответствующие критериям фильтра
source_find_results = source_db_obj.find(фильтр)
-
Прокрутка записей источника
создать целевые и исходные групповые операции
Добавить поле archived_at с текущим datetime в целевую коллекцию
//replace this with the logic to obtain the UTCtime.
doc['archived_at'] = db_help.getUTCTime()
target_bulk.insert(document)
source_bulk.remove(document)
для отката в случае каких-либо ошибок или исключений, создайте операции target_bulk_bak и source_bulk_bak.
target_bulk_bak.find({'_id':doc['_id']}).remove_one()
source_bulk_bak.insert(doc)
//remove the extra column
doc.pop('archieved_at', None)
-
Когда количество записей достигает 1000, выполните перемещение цели - объем и удаление источника. Примечание. Этот метод принимает объекты target_bulk и source_bulk для выполнения.
execute_bulk_insert_remove (source_bulk, target_bulk)
-
Когда возникает исключение, выполните удаление target_bulk_bak и inesertions source_bulk_bak. Это отменит изменения. Поскольку mongodb не имеет отката, я придумал этот хак
execute_bulk_insert_remove (source_bulk_bak, target_bulk_bak)
-
Наконец, повторно инициализируйте исходные и целевые объекты bulk и bulk_bak. Это необходимо, потому что вы можете использовать их только один раз.
-
Полный код
def execute_bulk_insert_remove(source_bulk, target_bulk):
try:
target_bulk.execute()
source_bulk.execute()
except BulkWriteError as bwe:
raise Exception(
"could not archive document, reason: {}".format(bwe.details))
def archive_bulk_immediate(filter, source_db, source_col, target_db, target_col):
"""
filter: filter criteria for backup
source_db: source database name
source_col: source collection name
target_db: target database name
target_col: target collection name
"""
count = 0
bulk_count = 1000
source_db_obj = db_help.create_db_obj(source_db, source_col)
source_bulk = source_db_obj.initialize_ordered_bulk_op()
source_bulk_bak = source_db_obj.initialize_ordered_bulk_op()
target_db_obj = db_help.create_db_obj(target_db, target_col)
target_bulk = target_db_obj.initialize_ordered_bulk_op()
target_bulk_bak = target_db_obj.initialize_ordered_bulk_op()
source_find_results = source_db_obj.find(filter)
start = datetime.now()
for doc in source_find_results:
doc['archived_at'] = db_help.getUTCTime()
target_bulk.insert(doc)
source_bulk.find({'_id': doc['_id']}).remove_one()
target_bulk_bak.find({'_id': doc['_id']}).remove_one()
doc.pop('archieved_at', None)
source_bulk_bak.insert(doc)
count += 1
if count % 1000 == 0:
logger.info("count: {}".format(count))
try:
execute_bulk_insert_remove(source_bulk, target_bulk)
except BulkWriteError as bwe:
execute_bulk_insert_remove(source_bulk_bak, target_bulk_bak)
logger.info("Bulk Write Error: {}".format(bwe.details))
raise
source_bulk = source_db_obj.initialize_ordered_bulk_op()
source_bulk_bak = source_db_obj.initialize_ordered_bulk_op()
target_bulk = target_db_obj.initialize_ordered_bulk_op()
target_bulk_bak = target_db_obj.initialize_ordered_bulk_op()
end = datetime.now()
logger.info("archived {} documents to {} in ms.".format(
count, target_col, (end - start)))