我有来自各种来源的日志,日志的格式是
[2018-11-20 11:27:41,187] {base_task.py:98} INFO - Subtask: [2018-11-20 11:27:41,186] {child_task.py:355} INFO - Inside poll job status
[2018-11-20 11:27:41,187] {base_task.py:98} INFO - Subtask: [2018-11-20 11:27:41,186] {child_task.py:357} DEBUG - Poll time out has been set to: 6 hr(s)
[2018-11-20 11:27:41,188] {base_task.py:98} INFO - Subtask: [2018-11-20 11:27:41,186] {child_task.py:369} DEBUG - Batch_id of the running job is = 123456
[2018-11-20 11:27:41,188] {base_task.py:98} INFO - Subtask: [2018-11-20 11:27:41,186] {child_task.py:377} DEBUG - Getting cluster ID for the cluster:
我想把这些日志推送到索引为batch_id的弹性搜索中,如何实现这一点?问题是,我在一些行中有batch_id,而不是在所有行中。我已经编写了自定义解析器来将日志转换为JSON
td-agent.conf是
<source>
@type tail
path /tmp/logs/airflow.logs
pos_file /tmp/logs/airflow1.pos
format /^[(?<logtime>[^]]*)] {(?<parent_script>[^ ]*)} (?<parent_script_log_level>[^ ]*) - (?<subtask_name>[^ ]*): [(?<subtask_log_time>[^]]*)] {(?<script_name>[^ ]*)} (?<script_log_info>[^ ]*) - (?<message>[^*]*)/
time_key logtime
tag airflow_123
read_from_head true
include_tag_key true
tag_key event_tag
@log_level debug
</source>
<match airflow_123>
@type copy
<store>
@type stdout
</store>
<store>
@type elasticsearch
host es_host
port es_port
index_name fluentd.${tag}.%Y%m%d
<buffer tag, time>
timekey 1h # chunks per hours ("3600" also available)
</buffer>
type_name log
with_transporter_log true
@log_level debug
</store>
</match>
此外,使用EFK堆栈进行日志聚合的最佳实践是什么?
如果你想坚持使用Elastic堆栈的组件,日志可以如下读取、解析和持久化:
- Filbeat:读取事件(日志的每个逻辑行(并将其推送到Logstash
- Logstash:根据您的需求分析日志,将字符串分解为有意义的字段。字符串可以使用GROK筛选器进行解析。这比构建自定义解析器更可取。解析后的信息被发送到Elasticsearch,以便最好基于时间戳进行持久化和索引
- Kibana:使用单一搜索或聚合将解析的信息可视化