我知道我们可以输入文件,并输出到mongo数据库。但是我的 mongodb 中有一个集合,我想将其作为输入,以便我可以将其与 ES 一起使用。这可能吗?谢谢。
我也有类似的问题,logstash-input-mongodb 插件很好,但它非常有限,似乎也不再维护它,所以,我选择了 logstash-integration-jdbc 插件。
我按照以下步骤将MongoDB集合与ES同步:
首先,我已经下载了由DBSchema开发的MongoDB的JDBC驱动程序,你可以在这里找到。
我准备了一个自定义 Dockerfile 来集成驱动程序和插件,如下所示:
FROM docker.elastic.co/logstash/logstash:7.9.2
RUN mkdir /usr/share/logstash/drivers
COPY ./drivers/* /usr/share/logstash/drivers/
RUN logstash-plugin install logstash-integration-jdbc
RUN logstash-plugin install logstash-output-elasticsearch
我已经配置了一个查询,该查询将每 30 秒执行一次,并将查找插入时间戳晚于最后一个查询时间戳的文档(提供参数 :sql_last_value)
input {
jdbc {
jdbc_driver_library => "/usr/share/logstash/drivers/mongojdbc2.3.jar"
jdbc_driver_class => "com.dbschema.MongoJdbcDriver"
jdbc_connection_string => "jdbc:mongodb://devroot:devroot@mongo:27017/files?authSource=admin"
jdbc_user => "devroot"
jdbc_password => "devroot"
schedule => "*/30 * * * * *"
statement => "db.processed_files.find({ 'document.processed_at' : {'$gte': :sql_last_value}},{'_id': false});"
}
}
output {
stdout {
codec => rubydebug
}
elasticsearch {
action => "create"
index => "processed_files"
hosts => ["elasticsearch:9200"]
user => "elastic"
password => "password"
ssl => true
ssl_certificate_verification => false
cacert => "/etc/logstash/keys/certificate.pem"
}
}
希望它可以帮助某人,问候
你可以设置一条河流将数据从MongoDB拉到Elasticsearch。
请参阅此处的说明 - http://www.codetweet.com/ubuntu-2/configuring-elasticsearch-mongodb/
我尝试了塞尔吉奥·桑切斯·桑切的解决方案建议,发现了以下更新和改进:
input {
jdbc {
jdbc_driver_library => "/usr/share/logstash/drivers/mongojdbc3.0.jar"
jdbc_driver_class => "com.dbschema.MongoJdbcDriver"
jdbc_connection_string => "jdbc:mongodb://devroot:devroot@mongo:27017/files?authSource=admin"
jdbc_user => "devroot"
jdbc_password => "devroot"
schedule => "*/30 * * * * *"
statement => "db.processed_files.find({ 'document.processed_at' : {'$gte': new ISODate(:sql_last_value)}},{'_id': false});"
}
}
output {
stdout {
codec => rubydebug
}
elasticsearch {
action => "update"
doc_as_upsert => true
document_id => "%{[document][uuid]}"
index => "processed_files"
hosts => ["elasticsearch:9200"]
user => "elastic"
password => "password"
ssl => true
ssl_certificate_verification => false
cacert => "/etc/logstash/keys/certificate.pem"
}
}
解释:
Mongodb 中的日期比较必须使用新的 ISODate 进行转换:sql_last_value
我想使用"更新"而不是"创建"来覆盖更新的情况。部分输入的查询结果为包含在"文件"中。假设您有一个具有唯一值的字段"uuid",你必须用它来识别文档,因为Mongodb的无论如何都不支持"_id"。
如果您有任何嵌入的文档也已"_id"归档,您也必须将其排除,例如
语句 => "db.profiles.find({'updateAt' : {'$gte': new ISODate(:sql_last_value)}},{'_id': false, 'embedded_doc._id': false}});"
所以显然,简短的答案是否定的,不可能从 Logstash 中的数据库输入。
编辑
@elssar感谢您的回答:
实际上,logstash 有一个第三方 mongodb 输入 - github.com/phutchins/logstash-input-mongodb – 埃尔萨