如何使用Elasticsearch通过时间序列日志确定ETL健康状况



tl; dr:什么是与此postgres查询相等的弹性搜索?

    SELECT latest_pipeline_logs.* FROM (
      SELECT pipeline_logs.*, 
      rank() OVER (
          PARTITION BY pipeline_name
          ORDER BY updated_at DESC
      )
      FROM pipeline_logs
    ) latest_pipeline_logs WHERE RANK = 1

我有数百个带有日志的ETL管道。它们各自以不同的间隔独立执行。我想使用Elasticsearch聚合为我的每个ETL管道提供简单的健康状况。

每个管道在执行时都会记录其状态。我目前的思考过程是根据发生的两个最重要的状态来确定每个管道的健康:succeededfailed

我知道我可以通过每个管道进行聚合查询和组,并具有一个子聚集状态的状态。例如,按照此目的的某些内容:

{
  ...
  "aggs": {
    "pipelines": {
      "field": "pipeline_name"
    },
    "aggs": {
      "states": {
        "terms": {
          "field": "pipeline_state"
        }
      }
    }
  }
}

上面示例的问题是,由于时间序列数据集,我可以得到多个状态,例如:

{
  "key": "some-pipeline-name",
  "buckets": [
    {
      "key": "succeeded",
      "doc_count": 123
    },
    {
      "key": "failed",
      "doc_count": 567
    }
  ]
}

我可以根据管道执行的日期从理论上过滤结果,但是由于某些管道每隔一个月左右运行一次,所以我认为这不是一个选择。

最终状态是使用Elasticsearch结果集驾驶简单的仪表板,该集合看起来像这样:

[
  {
    "key": "some-pipeline-name",
    "latest-status": "succeeded"
  },
  {
    "key": "some-other-pipeline",
    "latest-status": "failed"
  }
]

要注意的一件事是在这种用例中,历史数据并不重要。仪表板将简单地传达每个管道的最新状态。

您将如何使用Elasticsearch实现这一目标?

如果您仅对每个管道的最新状态感兴趣,则可以将top_hits用作子聚集,然后按时排序

{
  "size": 0,
  "aggs": {
    "pipeline": {
      "terms": {
        "field": "pipeline_name",
        "size": 1000
      },
      "aggs": {
        "top_hits_status": {
          "top_hits": {
            "size": 1,
            "sort": [
              {
                "timestamp": {
                  "order": "desc"
                }
              }
            ],
            "_source": {
              "includes": [
                "pipeline_state"
              ]
            }
          }
        }
      }
    }
  }
}

最新更新