从开头重新加载相同的文件,而无需重新启动logstash



我通过将sincedb_path指向NUL(Windows环境)并在开始时设置start_position来强制logstash重新加载整个文件。这是我的file input配置:

input {
     file {
        path => "myfile.csv"
        start_position => beginning
        ignore_older => 0
        type => "my_document_type"
        sincedb_path => "NUL"
        stat_interval => 1
    }
}

实际上每次我重新启动LogStash并修改文件时,该文件实际上都会重新加载,但是我希望将其重新加载,如stat_interval中所述。
我也需要重新加载它,即使没有修改也不重新启动LogStash,我也需要重新加载,因为我在过滤器中添加了基于日期的字段,并且每天都需要使用更新的date_field

进行相同的数据。
filter {
    csv {
        columns => ["MyFirstColumn", "MySecondColumn"]
        separator => ";"
        add_field => {
        "date_field" => "%{+ddMMyyy}"
        }
    }
}  

这是预期行为的一个示例:

文件内容:

Column A;Column B
Value X;Value Y  

发送到弹性搜索索引的数据:

Column A : Value X, Column B : Value Y, date_field : 05122016

第二天,即使没有修改文件,我希望将以下数据添加到Elasticsearch中的同一索引:

Column A : Value X, Column B : Value Y, date_field : 06122016

我最终使用了exec输入,而不是file,并启动了cat命令每2秒钟读取文件。CAT命令检索整个文件内容,因此我使用第一个split过滤器分别检索每行,然后使用csv过滤器将列分开。这是我的配置文件内容:

input {
    exec {
        command => "cat myfile.csv"
        interval => 2
        add_field => {
              "tag" => "mytag"
        }
    }
}
filter {    
    if [tag] == "mytag" {
        split {
            terminator => "n"
        }
        csv {
            columns => ["myFirstColumn", "mySecondColumn", "mythirdColumn"]
            separator => ";"        
        }
}
output {
    if [tag] == "mytag" {
        elasticsearch {
            hosts => [ "localhost:9200" ]
            index => "myIndex"
        }
    }
}  

原始答案可在弹性讨论平台

中获得

最新更新