在Nodejs中,何时关闭MongoDB数据库连接



通过Node MongoDB本地驱动程序与Nodejs和MongoDB一起工作。需要检索一些文档,并进行修改,然后保存回来。下面是一个示例:

db.open(function (err, db) {
  db.collection('foo', function (err, collection) {
    var cursor = collection.find({});
    cursor.each(function (err, doc) {
      if (doc != null) {
        doc.newkey = 'foo'; // Make some changes
        db.save(doc); // Update the document
      } else {
        db.close(); // Closing the connection
      }
    });
  });
});

具有异步特性,如果更新文档的过程花费的时间较长,那么当游标到达文档末尾时,数据库连接将关闭。不是所有的更新都保存到数据库。

如果省略db.close(),则所有文档都正确更新,但应用程序挂起,永远不会退出。

我看到一个帖子建议使用计数器跟踪更新的数量,当下降到零,然后关闭数据库。但我做错什么了吗?处理这种情况的最好方法是什么?是否必须使用db.close()来释放资源?还是需要打开一个新的数据库连接?

这是一个基于计数方法的潜在解决方案(我还没有测试它,没有错误捕获,但它应该传达了这个想法)。

基本策略是:获取需要更新多少条记录的计数,异步保存每条记录,并在成功时回调,如果计数达到0(当最后一次更新完成时)将减少计数并关闭DB。通过使用{safe:true},我们可以确保每次更新都是成功的。

mongo服务器每个连接将使用一个线程,所以最好是a)关闭未使用的连接,或者b)池/重用它们。

db.open(function (err, db) {
  db.collection('foo', function (err, collection) {
    var cursor = collection.find({});
    cursor.count(function(err,count)){
      var savesPending = count;
      if(count == 0){
        db.close();
        return;
      }
      var saveFinished = function(){
        savesPending--;
        if(savesPending == 0){
          db.close();
        }
      }
      cursor.each(function (err, doc) {
        if (doc != null) {
          doc.newkey = 'foo'; // Make some changes
          db.save(doc, {safe:true}, saveFinished);
        }
      });
    })
  });
});

最好使用池连接,然后在应用程序生命周期结束时调用db.close()中的清理函数:

process.on('SIGINT', cleanup);
process.on('SIGTERM', cleanup);

见http://mongodb.github.io/node-mongodb-native/driver-articles/mongoclient.html

有点旧的线程,但无论如何。

这里是对pkopac给出的答案的扩展示例,因为我必须弄清楚其余的细节:

const client = new MongoClient(uri);
(async () => await client.connect())();
// use client to work with db
const find = async (dbName, collectionName) => {
  try {
    const collection = client.db(dbName).collection(collectionName);
    const result = await collection.find().toArray()
    return result;
  } catch (err) {
    console.error(err);
  }
}
const cleanup = (event) => { // SIGINT is sent for example when you Ctrl+C a running process from the command line.
  client.close(); // Close MongodDB Connection when Process ends
  process.exit(); // Exit with default success-code '0'.
}
process.on('SIGINT', cleanup);
process.on('SIGTERM', cleanup);

这里是SIGINTSIGTERM之间差异的链接。我必须添加process.exit(),否则我的节点web服务器在命令行中对运行进程执行Ctrl + C时没有干净地退出。

我发现使用counter可能适用于简单的场景,但在复杂的情况下可能很难。下面是我提出的一个解决方案,当数据库连接空闲时关闭数据库连接:

var dbQueryCounter = 0;
var maxDbIdleTime = 5000; //maximum db idle time
var closeIdleDb = function(connection){
  var previousCounter = 0;
  var checker = setInterval(function(){
    if (previousCounter == dbQueryCounter && dbQueryCounter != 0) {
        connection.close();
        clearInterval(closeIdleDb);
    } else {
        previousCounter = dbQueryCounter;
    }
  }, maxDbIdleTime);
};
MongoClient.connect("mongodb://127.0.0.1:27017/testdb", function(err, connection)(
  if (err) throw err;
  connection.collection("mycollection").find({'a':{'$gt':1}}).toArray(function(err, docs) {
    dbQueryCounter ++;
  });   
  //do any db query, and increase the dbQueryCounter
  closeIdleDb(connection);
));

这可以是任何数据库连接的通用解决方案。maxDbIdleTime可以设置为与db查询超时时间相同或更长。

这不是很优雅,但我想不出更好的方法来做到这一点。我使用NodeJs运行查询MongoDb和Mysql的脚本,如果数据库连接没有正确关闭,脚本将永远挂在那里。

我想到了一个解决办法。它避免了使用toArray,它非常简短:

var MongoClient = require('mongodb').MongoClient;
MongoClient.connect("mongodb://localhost:27017/mydb", function(err, db) {
  let myCollection = db.collection('myCollection');
  let query = {}; // fill in your query here
  let i = 0;
  myCollection.count(query, (err, count) => { 
    myCollection.find(query).forEach((doc) => {
      // do stuff here
      if (++i == count) db.close();
    });
  });
});

我想到了一个包含这样一个计数器的解决方案。它不依赖于count()调用,也不等待超时。它将在each()中的所有文档耗尽后关闭数据库。

var mydb = {}; // initialize the helper object.
mydb.cnt = {}; // init counter to permit multiple db objects.
mydb.open = function(db) // call open to inc the counter.
{
  if( !mydb.cnt[db.tag] ) mydb.cnt[db.tag] = 1;
  else mydb.cnt[db.tag]++;
}; 
mydb.close = function(db) // close the db when the cnt reaches 0.
{
  mydb.cnt[db.tag]--;
  if ( mydb.cnt[db.tag] <= 0 ) {
    delete mydb.cnt[db.tag];
    return db.close();
  }
  return null;
};

因此,每次你要调用db.each()或db.save()时,你将使用这些方法来确保数据库在工作时准备好并在完成时关闭。

示例来自OP:

foo = db.collection('foo');
mydb.open(db); // *** Add here to init the counter.**  
foo.find({},function(err,cursor)
{
  if( err ) throw err; 
  cursor.each(function (err, doc)
  {
    if( err ) throw err;
    if (doc != null) {
      doc.newkey = 'foo';
      mydb.open(db); // *** Add here to prevent from closing prematurely **
      foo.save(doc, function(err,count) {
        if( err ) throw err;
        mydb.close(db); // *** Add here to close when done. **
      }); 
    } else {
      mydb.close(db); // *** Close like this instead. **
    }
  });
});

现在,假设每个回调函数的倒数第二个回调函数都通过了mydb.open(),然后每个回调函数的最后一个回调函数到达了mydb.close()....所以,当然,如果这是一个问题,请告诉我。

所以:在db调用之前放一个mydb.open(db),在回调的返回点或在db调用之后(取决于调用类型)放一个mydb.close(db)。

在我看来,这种计数器应该在db对象中维护,但这是我目前的解决方案。也许我们可以创建一个新对象,在构造函数中使用db,并包装mongodb函数以更好地处理关闭。

根据@mpobrien上面的建议,我发现async模块在这方面非常有帮助。下面是我采用的一个示例模式:

const assert = require('assert');
const async = require('async');
const MongoClient = require('mongodb').MongoClient;
var mongodb;
async.series(
    [
        // Establish Covalent Analytics MongoDB connection
        (callback) => {
            MongoClient.connect('mongodb://localhost:27017/test', (err, db) => {
                assert.equal(err, null);
                mongodb = db;
                callback(null);
            });
        },
        // Insert some documents
        (callback) => {
            mongodb.collection('sandbox').insertMany(
                [{a : 1}, {a : 2}, {a : 3}],
                (err) => {
                    assert.equal(err, null);
                    callback(null);
                }
            )
        },
        // Find some documents
        (callback) => {
            mongodb.collection('sandbox').find({}).toArray(function(err, docs) {
                assert.equal(err, null);
                console.dir(docs);
                callback(null);
            });
        }
    ],
    () => {
        mongodb.close();
    }
);

无需计数器、库或任何自定义代码的现代方法:

let MongoClient = require('mongodb').MongoClient;
let url = 'mongodb://yourMongoDBUrl';
let database = 'dbName';
let collection = 'collectionName';
MongoClient.connect(url, { useNewUrlParser: true }, (mongoError, mongoClient) => {
   if (mongoError) throw mongoError;
   // query as an async stream
   let stream = mongoClient.db(database).collection(collection)
        .find({}) // your query goes here
        .stream({
          transform: (readElement) => {
            // here you can transform each element before processing it
            return readElement;
          }
        });
   // process each element of stream (async)
   stream.on('data', (streamElement) => {
        // here you process the data
        console.log('single element processed', streamElement);
   });
   // called only when stream has no pending elements to process
   stream.once('end', () => {
     mongoClient.close().then(r => console.log('db successfully closed'));
   });
});

在版本3.2.7的mongodb驱动上测试,但根据链接可能是有效的,因为版本2.0

最新更新