我有两个索引
- employee_data
{"code":1, "name":xyz, "city":"Mumbai" }
- 事务_数据
{"code":1, "Month":June", payment:78000 }
我想要这样的第三个索引3( join_index
{"code":1, "name":xyz, "city":"Mumbai", "Month":June", payment:78000 }
怎么可能??
我正在试用logstash
input {
elasticsearch {
hosts => "localost"
index => "employees_data,transaction_data"
query => '{ "query": { "match": { "code": 1} } }'
scroll => "5m"
docinfo => true
}
}
output {
弹性搜索{主机=>["localhost"]
index => "join1"
}
}
您可以在employees_data上使用弹性搜索输入
在过滤器中,使用transaction_data上的弹性搜索过滤器
input {
elasticsearch {
hosts => "localost"
index => "employees_data"
query => '{ "query": { "match_all": { } } }'
sort => "code:desc"
scroll => "5m"
docinfo => true
}
}
filter {
elasticsearch {
hosts => "localhost"
index => "transaction_data"
query => "(code:"%{[code]}"
fields => {
"Month" => "Month",
"payment" => "payment"
}
}
}
output {
elasticsearch {
hosts => ["localhost"]
index => "join1"
}
}
并将您的新文档发送到具有弹性搜索输出的第三个索引
你将有3个弹性搜索连接,结果可能会有点慢。但它是有效的。
您不需要Logstash来实现这一点,Elasticsearch本身通过利用enrich processor
来支持这一点。
首先,您需要创建一个丰富策略(使用最小的索引,假设它是employees_data
(:
PUT /_enrich/policy/employee-policy
{
"match": {
"indices": "employees_data",
"match_field": "code",
"enrich_fields": ["name", "city"]
}
}
然后,您可以执行该策略来创建一个浓缩索引
POST /_enrich/policy/employee-policy/_execute
创建并填充富集索引后,下一步需要创建一个使用以上富集策略/索引的摄取管道:
PUT /_ingest/pipeline/employee_lookup
{
"description" : "Enriching transactions with employee data",
"processors" : [
{
"enrich" : {
"policy_name": "employee-policy",
"field" : "code",
"target_field": "tmp",
"max_matches": "1"
}
},
{
"script": {
"if": "ctx.tmp != null",
"source": "ctx.putAll(ctx.tmp); ctx.remove('tmp');"
}
}
]
}
最后,您现在可以使用联接的数据创建目标索引了。只需将_reindex
API与我们刚刚创建的摄取管道结合使用即可:
POST _reindex
{
"source": {
"index": "transaction_data"
},
"dest": {
"index": "join1",
"pipeline": "employee_lookup"
}
}
运行此操作后,join1
索引将包含您所需要的内容,例如:
{
"_index" : "join1",
"_type" : "_doc",
"_id" : "0uA8dXMBU9tMsBeoajlw",
"_score" : 1.0,
"_source" : {
"code":1,
"name": "xyz",
"city": "Mumbai",
"Month": "June",
"payment": 78000
}
}
据我所知,仅仅使用弹性搜索API是不可能实现这一点的。要处理此问题,您需要为相关文档设置一个唯一的ID。例如,您在问题中提到的代码可以作为文档的良好ID。因此,您可以将第一个索引重新索引到第三个索引,并使用UPDATE API通过从第二个索引中读取文档来更新它们,并通过它们的ID将它们更新到第三索引中。我希望我能帮上忙。