我正在尝试将文件中的日志插入clickhouse数据库。我已经通过手动在文件中插入几行来完成配置和测试。但当我尝试在不断写入的实时日志文件上做同样的事情时,logstash无法正常工作;echo"此处有日志">gt/tmp/testing.log";并且具有正确模式的日志在";grok";匹配并存储在redis和clickhouse数据库中。我不知道我在这里做错了什么,需要指导,因为我已经花了更多的时间调试了。
日志
{"level":"info","msg":"Pid :: 468 :: ESQ :: INSERT_LOG USER :: 110876 QDATA :: 110876|||16230042103234123|||9|||20210607|||2021-06-07 15:44:11|||example@yahoo.com|||noreply@test.com|||184050|||Email dropped: User suppressed due to hard bounce|||||||||SMTP||||||||||||","time":"2021-06-07T15:44:12+05:30"}
Logstash.conf
input {
beats {
port => 5044
}
}
filter
{
if "INSERT_LOG" in [message]
{
grok
{
match => [ "message" , "%{GREEDYDATA:waste}:: ESQ :: INSERT_LOG USER :: %{NUMBER:userid1} QDATA :: %{NUMBER:userid}|||%{NUMBER:docid}|||%{NUMBER:stats}|||%{DATA:cdate}|||%{DATA:ctime}|||%{DATA:recipient_email}|||%{DATA:from_email}|||%{DATA:message_size}|||%{GREEDYDATA:Remarks}|||%{GREEDYDATA:cheader}|||%{DATA:subjectline}|||%{GREEDYDATA:emailType}|||%{GREEDYDATA:tag}|||%{GREEDYDATA:message_id}|||%{GREEDYDATA:scheduled_time}|||%{GREEDYDATA:acheaders} time:%{GREEDYDATA:waste2}"]
}
mutate
{
add_field => { "USER" => "%{clientid}" }
add_field => { "INDEX_TYPE" => "INSERT_LOG" }
remove_field => "waste"
remove_field => "waste2"
}
}
else
{
mutate
{
add_field => { "INDEX_TYPE" => "OTHER" }
}
}
}
output
{
if [INDEX_TYPE] == "OTHER"
{
redis {
data_type => "list"
host => ["127.0.0.1:6379"]
key => "EVENTS_TEST_CUSTOM"
}
}
else
{
redis {
data_type => "list"
host => ["127.0.0.1:6379"]
key => "EVENTS_TEST_CUSTOM_CH"
}
clickhouse {
http_hosts => ["http://127.0.0.1:8123"]
table => "logs.logs_table_filebeat"
flush_size => 1000
pool_max => 1000
}
}
}
Filebeat YML
filebeat.prospectors:
- input_type: log
paths:
- /usr/share/filebeat/logs/EVENTS.log
exclude_files: ['.gz$']
output.logstash:
hosts: ['127.0.0.1:5044']
调试后发现,当我在终端上执行echo以附加日志文件时,它是正常的字符串类型,但实际的日志是json类型。所以我需要修改我的logstash.conf文件来解析json。json { source => "message" }