如何拥有 MongoDB for Logstash 类型的输入



我知道我们可以输入文件,并输出到mongo数据库。但是我的 mongodb 中有一个集合,我想将其作为输入,以便我可以将其与 ES 一起使用。这可能吗?谢谢。

我也有类似的问题,logstash-input-mongodb 插件很好,但它非常有限,似乎也不再维护它,所以,我选择了 logstash-integration-jdbc 插件。

我按照以下步骤将MongoDB集合与ES同步:

首先,我已经下载了由DBSchema开发的MongoDB的JDBC驱动程序,你可以在这里找到。

我准备了一个自定义 Dockerfile 来集成驱动程序和插件,如下所示:

FROM docker.elastic.co/logstash/logstash:7.9.2
RUN mkdir /usr/share/logstash/drivers
COPY ./drivers/* /usr/share/logstash/drivers/
RUN logstash-plugin install logstash-integration-jdbc
RUN logstash-plugin install logstash-output-elasticsearch

我已经配置了一个查询,该查询将每 30 秒执行一次,并将查找插入时间戳晚于最后一个查询时间戳的文档(提供参数 :sql_last_value

input {
  jdbc {
    jdbc_driver_library => "/usr/share/logstash/drivers/mongojdbc2.3.jar"
    jdbc_driver_class => "com.dbschema.MongoJdbcDriver"
    jdbc_connection_string => "jdbc:mongodb://devroot:devroot@mongo:27017/files?authSource=admin"
    jdbc_user => "devroot"
    jdbc_password => "devroot"
    schedule => "*/30 * * * * *"
    statement => "db.processed_files.find({ 'document.processed_at' : {'$gte': :sql_last_value}},{'_id': false});"
  }
}
output {
  stdout {
    codec => rubydebug
  }
  elasticsearch {
    action => "create"
    index => "processed_files"
    hosts => ["elasticsearch:9200"]
    user => "elastic"
    password => "password"
    ssl => true
    ssl_certificate_verification => false
    cacert => "/etc/logstash/keys/certificate.pem"
  }
}

希望它可以帮助某人,问候

你可以设置一条河流将数据从MongoDB拉到Elasticsearch。

请参阅此处的说明 - http://www.codetweet.com/ubuntu-2/configuring-elasticsearch-mongodb/

我尝试了塞尔吉奥·桑切斯·桑切的解决方案建议,发现了以下更新和改进:

input {
  jdbc {
    jdbc_driver_library => "/usr/share/logstash/drivers/mongojdbc3.0.jar"
    jdbc_driver_class => "com.dbschema.MongoJdbcDriver"
    jdbc_connection_string => "jdbc:mongodb://devroot:devroot@mongo:27017/files?authSource=admin"
    jdbc_user => "devroot"
    jdbc_password => "devroot"
    schedule => "*/30 * * * * *"
    statement => "db.processed_files.find({ 'document.processed_at' : {'$gte': new ISODate(:sql_last_value)}},{'_id': false});"
  }
}
output {
  stdout {
    codec => rubydebug
  }
  elasticsearch {
    action => "update"
    doc_as_upsert => true
    document_id => "%{[document][uuid]}"
    index => "processed_files"
    hosts => ["elasticsearch:9200"]
    user => "elastic"
    password => "password"
    ssl => true
    ssl_certificate_verification => false
    cacert => "/etc/logstash/keys/certificate.pem"
  }
}

解释:

  • Mongodb 中的日期比较必须使用新的 ISODate 进行转换:sql_last_value

  • 我想使用"更新"而不是"创建"来覆盖更新的情况。部分输入的查询结果为包含在"文件"中。假设您有一个具有唯一值的字段"uuid",你必须用它来识别文档,因为Mongodb的无论如何都不支持"_id"。

  • 如果您有任何嵌入的文档也已"_id"归档,您也必须将其排除,例如

    语句 => "db.profiles.find({'updateAt' : {'$gte': new ISODate(:sql_last_value)}},{'_id': false, 'embedded_doc._id': false}});"

所以显然,简短的答案是否定的,不可能从 Logstash 中的数据库输入。

编辑

@elssar感谢您的回答:

实际上,logstash 有一个第三方 mongodb 输入 - github.com/phutchins/logstash-input-mongodb – 埃尔萨

最新更新