使用 mongoose 在 MongoDB 中批量更新插入

IT技术 javascript node.js mongodb mongoose mongodb-query
2021-01-24 19:43:32

是否有任何选项可以使用 mongoose 执行批量 upsert?所以基本上有一个数组,如果不存在则插入每个元素,如果存在则更新它?(我正在使用海关 _ids)

当我确实使用.insert MongoDB 时,会为重复键(应该更新)返回错误 E11000。插入多个新文档虽然工作正常:

var Users = self.db.collection('Users');

Users.insert(data, function(err){
            if (err) {
                callback(err);
            }
            else {
                callback(null);
            }
        });

使用.save 会返回参数必须是单个文档的错误:

Users.save(data, function(err){
   ...
}

这个答案表明没有这样的选项,但它是特定于 C# 的,并且已经 3 岁了。所以我想知道是否有任何选项可以使用mongoose来做到这一点?

谢谢!

6个回答

具体不是“mongoose”,或者至少在写作时还没有。从 2.6 版开始,MongoDB shell 实际上在“幕后”中使用了“批量操作 API ”,因为它用于所有通用辅助方法。在它的实现中,它首先尝试执行此操作,如果检测到旧版本服务器,则会“回退”到旧版实现。

所有 mongoose 方法“当前”都使用“遗留”实现或写关注响应和基本遗留方法。但是.collection任何给定的 mongoose 模型都有一个访问器,它本质上访问来自底层“节点本机驱动程序”的“集合对象”,在该驱动程序上实现了 mongoose:

 var mongoose = require('mongoose'),
     Schema = mongoose.Schema;

 mongoose.connect('mongodb://localhost/test');

 var sampleSchema  = new Schema({},{ "strict": false });

 var Sample = mongoose.model( "Sample", sampleSchema, "sample" );

 mongoose.connection.on("open", function(err,conn) { 

    var bulk = Sample.collection.initializeOrderedBulkOp();
    var counter = 0;

    // representing a long loop
    for ( var x = 0; x < 100000; x++ ) {

        bulk.find(/* some search */).upsert().updateOne(
            /* update conditions */
        });
        counter++;

        if ( counter % 1000 == 0 )
            bulk.execute(function(err,result) {             
                bulk = Sample.collection.initializeOrderedBulkOp();
            });
    }

    if ( counter % 1000 != 0 )
        bulk.execute(function(err,result) {
           // maybe do something with result
        });

 });

主要的问题是“mongoose方法”实际上意识到可能实际上还没有建立连接并在完成之前“排队”。您正在“深入研究”的本机驱动程序并没有做出这种区分。

所以你真的必须意识到连接是以某种方式或形式建立的。但是您可以使用本机驱动程序方法,只要您小心自己在做什么。

@joeytwiddle“批量”操作在您调用.execute(). 目的是服务器的任何“来回”都会消耗 IO,因此您正在尝试将其最小化。确实,在同步循环中,您可能会.execute()发生多次并使用多个连接。但是您可以使用类似async.whilst或其他控件来更改它,其中迭代可以由回调(因此在内部.execute())控制以处理完成。用 Promise 做这件事有点困难,但仍然有可能。
2021-03-30 19:43:32
谢谢!这很好用。我会喜欢 joao 的方法,但我没有设法使用 .update() 上传多个文档......当然我可以在 for 循环中完成,但我想批量上传更有效?还是没有区别,因为数据库连接无论如何都是打开的?
2021-04-01 19:43:32
@ user3122267 Upsert ant Bulk 基本上是“粉笔和奶酪”,不一样甚至接近。“upsert”创建一个不存在的新文档,而“Bulk”是批量操作。另一个选项是“multi”,因为.update()默认情况下只会修改“第一个”找到的文档。喜欢这个方法吗?看到那些一无所知的评论员与真正有知识的回答者的巨大区别了吗?
2021-04-04 19:43:32
我注意到这个答案和@konsumer 的所有记录都是同步循环的。我很好奇bulk在一次滴答中创建 10 个操作的性能差异,而不是bulk在 10 个单独的滴答中创建 10 个操作(就 Node 中的内存使用而言)。
2021-04-10 19:43:32
@zstew 提出新问题的正确位置是提出另一个问题,而不是评论旧帖子。您似乎错过了本答案末尾的陈述。如果您仍然不明白这意味着什么,请再问一个问题。
2021-04-14 19:43:32

您不需要像@neil-lunn 建议的那样管理限制(1000)。mongoose已经这样做了。我使用他的出色回答作为这个完整的基于 Promise 的实现和示例的基础:

var Promise = require('bluebird');
var mongoose = require('mongoose');

var Show = mongoose.model('Show', {
  "id": Number,
  "title": String,
  "provider":  {'type':String, 'default':'eztv'}
});

/**
 * Atomic connect Promise - not sure if I need this, might be in mongoose already..
 * @return {Priomise}
 */
function connect(uri, options){
  return new Promise(function(resolve, reject){
    mongoose.connect(uri, options, function(err){
      if (err) return reject(err);
      resolve(mongoose.connection);
    });
  });
}

/**
 * Bulk-upsert an array of records
 * @param  {Array}    records  List of records to update
 * @param  {Model}    Model    Mongoose model to update
 * @param  {Object}   match    Database field to match
 * @return {Promise}  always resolves a BulkWriteResult
 */
function save(records, Model, match){
  match = match || 'id';
  return new Promise(function(resolve, reject){
    var bulk = Model.collection.initializeUnorderedBulkOp();
    records.forEach(function(record){
      var query = {};
      query[match] = record[match];
      bulk.find(query).upsert().updateOne( record );
    });
    bulk.execute(function(err, bulkres){
        if (err) return reject(err);
        resolve(bulkres);
    });
  });
}

/**
 * Map function for EZTV-to-Show
 * @param  {Object} show EZTV show
 * @return {Object}      Mongoose Show object
 */
function mapEZ(show){
  return {
    title: show.title,
    id: Number(show.id),
    provider: 'eztv'
  };
}

// if you are  not using EZTV, put shows in here
var shows = []; // giant array of {id: X, title: "X"}

// var eztv = require('eztv');
// eztv.getShows({}, function(err, shows){
//   if(err) return console.log('EZ Error:', err);

//   var shows = shows.map(mapEZ);
  console.log('found', shows.length, 'shows.');
  connect('mongodb://localhost/tv', {}).then(function(db){
    save(shows, Show).then(function(bulkRes){
      console.log('Bulk complete.', bulkRes);
      db.close();
    }, function(err){
        console.log('Bulk Error:', err);
        db.close();
    });
  }, function(err){
    console.log('DB Error:', err);
  });

// });

这样做的好处是在连接完成后关闭连接,如果您关心,则显示任何错误,如果不关心,则忽略它们(Promise 中的错误回调是可选的。)它也非常快。只是把这个留在这里分享我的发现。例如,如果您想将所有 eztv 节目保存到数据库中,您可以取消注释 eztv 内容。

@ECMAScript 事实上,Neil 和 konsumer 的建议都消耗了相似数量的 Node 内存,因为这两种技术都会在不等待 Mongo 响应的情况下继续创建文档。显然,如果您打算插入的文档多于 RAM 中的容量,这只是一个问题。
2021-03-19 19:43:32
@PirateApp 也许您的内存不足,无法保存结构?你得到的错误是什么?如果您没有足够的内存来保存它,您可能必须使用串行Promise来一个一个地运行它们或批量运行它们。
2021-03-30 19:43:32
这不会消耗更多内存吗?
2021-04-11 19:43:32
比什么消耗更多的内存?
2021-04-14 19:43:32
是的。这就是bulk.execute它的作用。docs.mongodb.org/v3.0/reference/method/...
2021-04-14 19:43:32
await Model.bulkWrite(docs.map(doc => ({
    updateOne: {
        filter: {id: doc.id},
        update: doc,
        upsert: true
    }
})))


或者更详细:

const bulkOps = docs.map(doc => ({
    updateOne: {
        filter: {id: doc.id},
        update: doc,
        upsert: true
    }
}))

Model.bulkWrite(bulkOps)
        .then(bulkWriteOpResult => console.log('BULK update OK:', bulkWriteOpResult))
        .catch(err => console.error('BULK update error:', err))

https://stackoverflow.com/a/60330161/5318303

我已经为 Mongoose 发布了一个插件,它公开了一个静态upsertMany方法来使用 promise 接口执行批量 upsert 操作。

使用这个插件而不是在底层集合上初始化你自己的批量操作的一个额外好处是,这个插件首先将你的数据转换为 Mongoose 模型的数据,然后在 upsert 之前转换回普通对象。这可确保应用 Mongoose 模式验证,并减少数据填充并适合原始插入。

https://github.com/meanie/mongoose-upsert-many https://www.npmjs.com/package/@meanie/mongoose-upsert-many

希望能帮助到你!

如果您没有在 db.collection 中看到批量方法,即您收到 xxx 变量没有方法的影响的错误: initializeOrderedBulkOp()

尝试更新您的mongoose版本。显然,较旧的 mongoose 版本不会通过所有底层 mongo db.collection 方法。

npm 安装mongoose

为我照顾它。