使用 knex 从文件读取并插入数据库结束错误



我有一小段代码,它从文件中读取一行,解析它,然后插入到我的数据库中。

但是在 10 到 12 千行之后,我总是收到此错误:

Unhandled rejection Error: Knex: Timeout acquiring a connection. The pool is probably full. Are you missing a .transacting(trx) call?

第一个文件大约有 150k 行。

已经尝试直接操纵交易,但没有成功。

关于如何正确管理资源以制作所有文件的任何想法?

这是我

现在正在尝试的代码:

var fs = require('fs');
var knexfile = require('./knexfile');
var knex = require('knex')(knexfile.production);
var readline = require('readline');
var rl = readline.createInterface({
  terminal : false,
  input : fs.createReadStream('FOO_BAR.TXT') // About 150k lines
});
knex.transaction(function(tx){
  rl.on('line', function(line) { 
    knex("dadosbrutos").insert({ // this table does exists
      AA_DATA : line.substring(0,8),
      BB_DATA : line.substring(8,16),
      CC_DATA : line.substring(36,44)
    }).then(function(){
      tx.commit(); // dies after 12k inserts
    });
  });
});

我创建了一个模块来承诺阅读行 https://www.npmjs.com/package/readline-promise,但我的答案包括这篇文章中的源代码。

还要确保不超过数据库的最大事务大小。

有没有理由需要使用事务? 如果没有,您可以在没有事务的情况下进行所有插入。

var fs = require('fs');
var promise = require('bluebird');
var knexfile = require('./knexfile');
var knex = require('knex')(knexfile.production);
// iterates through each line calling callback
function each(cfg) {
    return function(callback) {
        return new promise(function(resolve, reject) {
            // create an array to store callbacks
            var rl, cbs = [];
            // create an interface
            try {
                rl = readline.createInterface(cfg);
            }
            catch(err) {
                return reject(err);
            }
            // handle a new line
            rl.on('line', function(line) {
                cbs.push(callback(line));
            });
            // handle close
            rl.on('close', function() {
                promise.all(cbs).then(function() {
                    resolve({
                        lines: cbs.length
                    });
                })
                .caught(function(err) {
                    reject(err);
                });
            });
        });
    };
}
knex.transaction(function(tx){
    return each({
        terminal : false,
        input : fs.createReadStream('FOO_BAR.TXT')
    })(function(line) {
        return knex("dadosbrutos").insert({ // this table does exists
            AA_DATA : line.substring(0,8),
            BB_DATA : line.substring(8,16),
            CC_DATA : line.substring(36,44)
        }).transacting(tx);
    })
    .then(function() {
        tx.commit();
    });
});

相关内容

  • 没有找到相关文章

最新更新