Elasticsearch批量API发布请求中出现NewLine错误



我正在尝试使用弹性搜索批量api将多条记录插入到索引中。我的JSON看起来像这样:请求JSON

我在文档末尾插入了一个新行(\n(,但我仍然得到了newline error

Error: {
"error": {
"root_cause": [
{
"type": "illegal_argument_exception",
"reason": "The bulk request must be terminated by a newline [n]"
}
],
"type": "illegal_argument_exception",
"reason": "The bulk request must be terminated by a newline [n]"
},
"status": 400
}

基于我之前的回答和https://stackoverflow.com/a/50754789/8160318:

const AWS = require('aws-sdk');
const creds = new AWS.EnvironmentCredentials('AWS');
const INDEX_NAME = 'index_name';
const esDomain = {
region: 'us-east-1',
endpoint: 'yoursearchdomain.region.amazonaws.com',
index: 'myindex',
doctype: 'mytype'
};
const endpoint = new AWS.Endpoint(esDomain.endpoint);
const req = new AWS.HttpRequest(endpoint);

const docs_as_body_params = JSON.parse(
'[' +
`{"index":{}} {"tags":["ab","cd"],"question":"test this","answer":"answer first"} {"index":{}} {"tags":["de","fg"],"question":"test second","answer":"answer second"}`.split(
/(s?{"index":{}} )/g
)
.filter(match => match.length)
.filter((_, index) => index % 2 !== 0) +
']'
);
const bulk_body = [];
docs_as_body_params.map((doc) => {
bulk_body.push({
index: {
_index: INDEX_NAME,
_id: doc.id || null
}
});
bulk_body.push(doc);
});
/// THE MOST IMPORTANT PART -- getting to a valid ndjson
const ndjson_payload = bulk_body.map(JSON.stringify).join('n') + 'n'
req.method = 'POST';
req.path = '_bulk'
req.region = esDomain.region;
req.headers['presigned-expires'] = false;
req.headers['Host'] = endpoint.host;
req.headers['Content-Type'] = 'application/json';
req.body = ndjson_payload;
var signer = new AWS.Signers.V4(req, 'es');
signer.addAuthorization(creds, new Date());
var send = new AWS.NodeHttpClient();
send.handleRequest(req, null, function (httpResp) {
var respBody = '';
httpResp.on('data', function (chunk) {
respBody += chunk;
});
httpResp.on('end', function (chunk) {
console.log('Response: ' + respBody);
context.succeed('Lambda added document ' + doc);
});
}, function (err) {
console.log('Error: ' + err);
context.fail('Lambda failed with error ' + err);
});

您的json在某个时候是nd-json(换行符(JSON,但现在看起来一团糟,所以我们必须事先进行一些清理。

初始化:

const {
Client
} = require("@elastic/elasticsearch");
const client = new Client({
node: 'http://localhost:9200'
});
const INDEX_NAME = 'index_name';

将潜在的ndjson转换为可消耗的数组或对象:

const docs_as_body_params = JSON.parse(
'[' +
`{"index":{}} {"tags":["ab","cd"],"question":"test this","answer":"answer first"} {"index":{}} {"tags":["de","fg"],"question":"test second","answer":"answer second"}`.split(
/(s?{"index":{}} )/g
)
// filter out empty strings
.filter(match => match.length)
// take every odd member (skipping `{"index":{}}`)
.filter((_, index) => index % 2 !== 0) +
']'
);

构建散装

const bulk_body = [];
docs_as_body_params.map((doc) => {
bulk_body.push({
index: {
_index: INDEX_NAME,
_id: doc.id || null
}
});
bulk_body.push(doc);
});

执行批量索引:

client.bulk({
body: bulk_body
},
(err, resp) => {
if (err || resp.errors) {
console.err(err || resp.errors)
}
console.info(resp.body.items);
}
);

相关内容

  • 没有找到相关文章

最新更新