使用logstash在弹性搜索中将两个索引合并到第三个索引中



我有两个索引

  1. employee_data{"code":1, "name":xyz, "city":"Mumbai" }
  2. 事务_数据{"code":1, "Month":June", payment:78000 }

我想要这样的第三个索引3( join_index

{"code":1, "name":xyz, "city":"Mumbai", "Month":June", payment:78000 }怎么可能??

我正在试用logstash

input {
elasticsearch {
hosts => "localost"
index => "employees_data,transaction_data"

query => '{ "query": { "match": { "code": 1} } }'
scroll => "5m"
docinfo => true
}
}
output {

弹性搜索{主机=>["localhost"]

index => "join1"
}

}

您可以在employees_data上使用弹性搜索输入

在过滤器中,使用transaction_data上的弹性搜索过滤器

input {
elasticsearch {
hosts => "localost"
index => "employees_data"

query => '{ "query": { "match_all": { } } }'
sort => "code:desc"
scroll => "5m"
docinfo => true
}
}
filter {
elasticsearch {
hosts => "localhost"
index => "transaction_data"
query => "(code:"%{[code]}"
fields => { 
"Month" => "Month",
"payment" => "payment" 
}
}
}
output {
elasticsearch { 
hosts => ["localhost"]
index => "join1"
}
}

并将您的新文档发送到具有弹性搜索输出的第三个索引

你将有3个弹性搜索连接,结果可能会有点慢。但它是有效的。

您不需要Logstash来实现这一点,Elasticsearch本身通过利用enrich processor来支持这一点。

首先,您需要创建一个丰富策略(使用最小的索引,假设它是employees_data(:

PUT /_enrich/policy/employee-policy
{
"match": {
"indices": "employees_data",
"match_field": "code",
"enrich_fields": ["name", "city"]
}
}

然后,您可以执行该策略来创建一个浓缩索引

POST /_enrich/policy/employee-policy/_execute

创建并填充富集索引后,下一步需要创建一个使用以上富集策略/索引的摄取管道:

PUT /_ingest/pipeline/employee_lookup
{
"description" : "Enriching transactions with employee data",
"processors" : [
{
"enrich" : {
"policy_name": "employee-policy",
"field" : "code",
"target_field": "tmp",
"max_matches": "1"
}
},
{
"script": {
"if": "ctx.tmp != null",
"source": "ctx.putAll(ctx.tmp); ctx.remove('tmp');"
}
}
]
}

最后,您现在可以使用联接的数据创建目标索引了。只需将_reindexAPI与我们刚刚创建的摄取管道结合使用即可:

POST _reindex
{
"source": {
"index": "transaction_data"
},
"dest": {
"index": "join1",
"pipeline": "employee_lookup"
}
}

运行此操作后,join1索引将包含您所需要的内容,例如:

{
"_index" : "join1",
"_type" : "_doc",
"_id" : "0uA8dXMBU9tMsBeoajlw",
"_score" : 1.0,
"_source" : {
"code":1, 
"name": "xyz", 
"city": "Mumbai", 
"Month": "June", 
"payment": 78000 
}
}

据我所知,仅仅使用弹性搜索API是不可能实现这一点的。要处理此问题,您需要为相关文档设置一个唯一的ID。例如,您在问题中提到的代码可以作为文档的良好ID。因此,您可以将第一个索引重新索引到第三个索引,并使用UPDATE API通过从第二个索引中读取文档来更新它们,并通过它们的ID将它们更新到第三索引中。我希望我能帮上忙。

最新更新