如何删除logstash中的动态mongodboid?



Mongodb有两种类型的$oid引用-

类型1-

//MongoDB
city_id : "5fe3206428bf745876649fd3"
//Kafka Message
city_id : {
"$oid": "5fe3206428bf745876649fd3"
}

类型2-

//MongoDB
city_ids : ["5fe3206428bf745876649fd3","5fe3206428bf745876649fd3","5fe3206428bf745876649fd3"]
//Kafka Message
city_ids : [
{
"$oid": "5fe3206428bf745876649fd3"
},
{
"$oid": "5fe3206428bf745876649fd3"
},
{
"$oid": "5fe3206428bf745876649fd3"
}
]

如何在logstash中处理这两种类型,以便在MongoDB中保存弹性搜索的确切数据结构。

input {
kafka {
bootstrap_servers => "localhost:9092"
decorate_events => true
topics => ["users","organisations","cities"]
}
}
filter { 
json {
source => "message"
target => "json_payload"
}
json {
source => "[json_payload][payload]"
target => "payload"
}

mutate {
rename => { "[payload]" => "document"}
remove_field => ["message","json_payload","payload"]
add_field => {
"[es_index]" => "%{[@metadata][kafka][topic]}" 
"[mongo_id]" => "%{[document][_id][$oid]}"
}
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "%{es_index}"
document_id => "%{mongo_id}"
}
stdout {
codec =>
rubydebug {
metadata => true
}
}
}

这是上一个问题的后续内容。

下面将对包含$oid条目的每个字段动态执行此操作。它对结构做了很多假设——如果它包含$oid条目,那么就只保留了这些条目。

ruby {
code => '
event.to_hash.each { |k, v|
if v.is_a? Hash
if v["$oid"]
event.set(k, v["$oid"])
end
end
if v.is_a? Array
if v[0]["$oid"]
a = []
v.each { |x| a << x["$oid"] }
event.set(k, a)
end
end
}
'
}

最新更新