MongoDB to Elasticsearch indexing



卡在 elasticsearch 中索引数据收集的点上。

以下是我尝试从 mongo 索引数据的代码。

const elasticsearch = require('elasticsearch');
// instantiate an Elas
var bulk = [];

var MongoClient = require('mongodb').MongoClient;
var ObjectID = require('mongodb').ObjectID;
var mongoDBName = 'mydb'; // Name of mongodb goes here
var mongoCollectionName = 'mycollection'; // Collection name of mongodb goes here
var connectionString = 'mongodb://127.0.0.1:27017/'; // put username and password for mongo here
var esIndexName = 'new-collection'; // Elasticsearch index name will go here
var bulk = [];
const client = new elasticsearch.Client({
hosts: [ 'http://localhost:9200']
});
// ping the client to be sure Elasticsearch is up
client.ping({
requestTimeout: 30000,
}, function(error) {
// At this point, eastic search is down, please check your Elasticsearch service
if (error) {
console.error('Elasticsearch cluster is down!');
} else {
console.log('Everything is ok');
}
});

MongoClient.connect(connectionString+mongoDBName, function(err, db) {
if(err) throw err;
// for each object in a collection
var collection = db.collection(mongoCollectionName);
var counter = 0;
collection.find().each(function(err, item, response, status) {
console.log(item)
Array.from(item).forEach(itemdata => {
bulk.push({index:{ 
_index: esIndexName, 
_type: mongoCollectionName,
}          
})
bulk.push(itemdata)
})
//perform bulk indexing of the data passed
client.bulk({body:bulk}, function( err, response  ){ 
if( err ){ 
console.log("Failed Bulk operation".red, err) 
} else { 
console.log("Successfully imported %s".green, mongoCollectionName.length); 
} 
console.log(response);
});
if(item != null) {    
if(counter % 100 == 0) console.log( "Syncing object id: "+ item['_id'] + " #: " + counter);
client.indices.create(
{ index: esIndexName },
function(error, response) {
if (error) {
console.log(error);
} else {
console.log("created a new index", response);
}
}
);
}
counter += 1;
});
});

所以在这里我尝试将数据索引到 elasticsearch 中,我能够创建集合索引,但未能将数据插入弹性搜索的索引中。谁能在这里帮我? 我哪里错了,我在这里犯了什么错误。 我在这里使用 nodejs,只是简单的函数进行测试,稍后将添加 lambda 函数进行更新/删除以及任何更改。

首先,我建议整理你的代码;很难看出块是如何嵌套的。

现在,您的代码存在几个问题:

  1. 你为什么要做Array.from(item).forEach(itemdata => {item是来自 Mongo 的文档对象,因此对其执行Array.from操作不起作用。
  2. 你在.each回调中调用bulkAPI;这意味着你将对每个文档进行一个API调用。我不认为这是你想要的。
  3. 您将在批量操作创建索引。这是错误的。在插入文档之前,应一劳永逸地创建 ES 索引。这很重要,因为将来您需要使用更高级的配置来处理文档。
  4. 对 ES 的ping调用很好,但如果集群关闭,它不会阻止其余代码运行。

所以你应该怎么做:

  1. 在循环访问文档之前创建 ES 索引。
  2. 遍历你的MongoDB文档,并将它们累积到你的身体对象中。
  3. 当您有一批n文档时,请调用bulkAPI 并重置正文。

这是您正在寻找的解决方案

索引.js

//MongoDB client config
var MongoClient = require('mongodb').MongoClient;
var mongoDBName = 'mydb'; // Name of mongodb goes here
var mongoCollectionName = 'mycollection'; // Collection name of mongodb goes here
var connectionString = 'mongodb://127.0.0.1:27017/'; // put username and password for mongo here
//Elasticsearch client config
const { Client } = require('@elastic/elasticsearch')
const esClient = new Client({ node: 'http://localhost:9200' });
var esIndexName = 'new-collection'; // Elasticsearch index name will go here
let bulk = [];
async function indexData() {
const client = await MongoClient.connect(connectionString, { useNewUrlParser: true })
.catch(err => { console.log(err); });
if (!client) {
return;
}
try {
const db = client.db(mongoDBName);
let collection = db.collection(mongoCollectionName);
await collection.find().forEach((doc) => {
bulk.push({
index: {
_index: esIndexName,
}
})
let { _id, ...data } = doc;
bulk.push(data);
})
console.log(bulk);
await esClient.indices.create({
index: esIndexName,
}, { ignore: [400] })
const { body: bulkResponse } = await esClient.bulk({ refresh: true, body: bulk })
if (bulkResponse.errors) {
const erroredDocuments = []
// The items array has the same order of the dataset we just indexed.
// The presence of the `error` key indicates that the operation
// that we did for the document has failed.
bulkResponse.items.forEach((action, i) => {
const operation = Object.keys(action)[0]
if (action[operation].error) {
erroredDocuments.push({
// If the status is 429 it means that you can retry the document,
// otherwise it's very likely a mapping error, and you should
// fix the document before to try it again.
status: action[operation].status,
error: action[operation].error,
operation: bulk[i * 2],
document: bulk[i * 2 + 1]
})
}
})
console.log(erroredDocuments)
}
const { body: count } = await esClient.count({ index: esIndexName })
console.log(count)
} catch (err) {
console.log(err);
} finally {
client.close();
}
}
indexData();

包.json

{
"name": "elastic-node-mongo",
"version": "1.0.0",
"description": "Simple example to connect ElasticSearch, MongoDB and NodeJS",
"main": "index.js",
"dependencies": {
"@elastic/elasticsearch": "^7.3.0",
"mongodb": "^3.3.2",
"nodemon": "1.18.3"
},
"scripts": {
"dev": "nodemon",
"start": "node index.js"
},
"keywords": [
"nodejs",
"node",
"mongodb",
"elasticsearch",
"docker"
],
"author": "Sathishkumar Rakkiasmy",
"license": "ISC"
}

澄清

我能够创建集合索引,但无法插入数据 在弹性搜索的索引中。

上面的句子是有道理的。因为bulk变量是不变的。

请参阅下面的链接,为什么bulk变量不变。

为什么在函数内部修改变量后,我的变量保持不变? - 异步代码参考

如何从异步调用返回响应?

了解有关异步编程的更多信息

https://developer.mozilla.org/en-US/docs/Learn/JavaScript/Asynchronous

https://developer.mozilla.org/en-US/docs/Learn/JavaScript/Asynchronous/Async_await

你可以制作 logstash 将数据从 mongo db 导入到 elasticsearch。请查找附件配置供您参考。

input {
mongodb {
codec => “json”
uri => ‘mongodb://localhost:27017/NewDb’
placeholder_db_dir => ‘/home/devbrt.shukla/Desktop/scalaoutput/ELK/logstash-6.4.1/db_dir’
placeholder_db_name => ‘Employee_sqlite.db’
collection => ‘Employee’
batch_size => 5000
generateId => ‘true’
parse_method => “simple”
}
}
filter {
mutate {
remove_field => [ “_id” ]
}
}
output {
elasticsearch {
hosts => [“localhost:9200”]
index => “employee-%{+YYYY.MM.dd}”
}
stdout { codec => rubydebug } }

在 Logstash 中,我们将输入、过滤和输出三个部分。

输入:是从sql,mongodb,mysql等获取数据。
过滤器:在本节中,我们可以构建自定义的 json 以索引到 elasticsearch 中。
输出:在本节中,我们将输入输出部分的索引名称、文档类型和 IP 地址,即 elasticsearch。

最新更新