我正在尝试使用 logstash 将 csv 导入 elasticsearch我尝试使用两种方法:
- 使用信息共享格式
- 使用 grok 过滤器
1(对于csv,下面是我的日志文件:
input {
file {
path => "path_to_my_csv.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
csv {
separator => ","
columns => ["col1","col2_datetime"]
}
mutate {convert => [ "col1", "float" ]}
date {
locale => "en"
match => ["col2_datetime", "ISO8601"] // tried this one also - match => ["col2_datetime", "yyyy-MM-dd HH:mm:ss"]
timezone => "Asia/Kolkata"
target => "@timestamp" // tried this one also - target => "col2_datetime"
}
}
output {
elasticsearch {
hosts => "http://localhost:9200"
index => "my_collection"
}
stdout {}
}
2( 使用 grok 过滤器:
对于 grok 过滤器,下面是我的日志存储文件
input {
file {
path => "path_to_my_csv.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
grok {
match => { "message" => "(?<col1>(?:%{BASE10NUM})),(%{TIMESTAMP_ISO8601:col2_datetime})"}
remove_field => [ "message" ]
}
date {
match => ["col2_datetime", "yyyy-MM-dd HH:mm:ss"]
}
}
output {
elasticsearch {
hosts => "http://localhost:9200"
index => "my_collection_grok"
}
stdout {}
}
问题:
因此,当我分别运行这两个文件时,我可以在 elasticsearch 中导入数据。但是我的日期字段没有解析为日期时间类型,而是已保存为字符串,因此我无法运行日期过滤器。
所以有人可以帮助我弄清楚为什么会发生这种情况。我的弹性搜索版本是 5.4.1。
提前致谢
我对你的配置文件做了 2 项更改。
1( 删除列名称col2_datetime中的under_score
2( 添加目标
这是我的配置文件的样子...
vi logstash.conf
input {
file {
path => "/config-dir/path_to_my_csv.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
csv {
separator => ","
columns => ["col1","col2"]
}
mutate {convert => [ "col1", "float" ]}
date {
locale => "en"
match => ["col2", "yyyy-MM-dd HH:mm:ss"]
target => "col2"
}
}
output {
elasticsearch {
hosts => "http://172.17.0.1:9200"
index => "my_collection"
}
stdout {}
}
这是数据文件:
vi path_to_my_csv.csv
1234365,2016-12-02 19:00:52
1234368,2016-12-02 15:02:02
1234369,2016-12-02 15:02:07