Elasticsearch-计算时间戳之间的延迟



如何在没有logstash但有script_fields的情况下计算时间戳之间的延迟?

例如,对于这些文件:

{
"_source": {
"name": "test1",
"timestamp": "2021-12-30 12:30:00"
}
}
{
"_source": {
"name": "test1",
"timestamp": "2021-12-30 12:30:01"
}
}
{
"_source": {
"name": "test1",
"timestamp": "2021-12-30 12:30:03"
}
}

我想要一个名为";time_taken";,所以预期的文档应该是这样的:

{
"_source": {
"name": "test1",
"timestamp": "2021-12-30 12:30:00"
}
}
{
"_source": {
"name": "test1",
"timestamp": "2021-12-30 12:30:01",
"time_taken": "1"
}
}
{
"_source": {
"name": "test1",
"timestamp": "2021-12-30 12:30:03",
"time_taken": "2"
}
}

提供的答案的灵感来自Transforms中的Painless示例。

该解决方案使用Transforms API,并且有一些限制。我建议您检查这些限制,看看它是否适合您的使用-基本Transform限制

首先,我为提供的示例创建了一个映射:

PUT myindex
{
"mappings": {
"properties": {
"name": {
"type": "text",
"fields": {
"keywords": {
"type": "keyword"
}
}
},
"timestamp": {
"type": "date"
}
}
}
}

并插入一些文档:

POST myindex/_doc
{
"name": "test1",
"timestamp":"2022-01-27T19:48:11Z"
}
POST myindex/_doc
{
"name": "test1",
"timestamp":"2022-01-27T19:50:11Z"
}
POST myindex/_doc
{
"name": "test1",
"timestamp":"2022-01-27T19:53:11Z"
}
POST myindex/_doc
{
"name": "test2",
"timestamp":"2022-01-27T19:35:11Z"
}
POST myindex/_doc
{
"name": "test2",
"timestamp":"2022-01-27T19:36:11Z"
}

使用Transform API,我们可以为每个聚合计算每个项的时间长度:

POST _transform/_preview
{
"source": {
"index": "myindex"
},
"dest": {
"index": "destindex"
},
"pivot": {
"group_by": {
"name": {
"terms": {
"field": "name.keywords"
}
}
},
"aggregations": {
"latest_value": {
"scripted_metric": {
"init_script": "state.timestamp_latest = 0L;",
"map_script": """
def current_date = doc['timestamp'].getValue().toInstant().toEpochMilli();
if (current_date > state.timestamp_latest)
{state.timestamp_latest = current_date;}
""",
"combine_script": "return state",
"reduce_script": """
def last_doc = '';
def timestamp_latest = 0L;
for (s in states) {if (s.timestamp_latest > (timestamp_latest))
{timestamp_latest = s.timestamp_latest;}}
return timestamp_latest
"""
}
},
"first_value": {
"scripted_metric": {
"init_script": "state.timestamp_first = 999999999999999L;",
"map_script": """
def current_date = doc['timestamp'].getValue().toInstant().toEpochMilli();
if (current_date < state.timestamp_first)
{state.timestamp_first = current_date;}
""",
"combine_script": "return state",
"reduce_script": """
def last_doc = '';
def timestamp_first = 999999999999999L;
for (s in states) {if (s.timestamp_first < (timestamp_first))
{timestamp_first = s.timestamp_first;}}
return timestamp_first
"""
}
},
"time_length": {
"bucket_script": {
"buckets_path": {
"min": "first_value.value",
"max": "latest_value.value"
},
"script": "(params.max - params.min)/1000"
}
}
}
}
}

输出如下:

{
"preview" : [
{
"time_length" : 300.0,
"name" : "test1",
"first_value" : 1643312891000,
"latest_value" : 1643313191000
},
{
"time_length" : 60.0,
"name" : "test2",
"first_value" : 1643312111000,
"latest_value" : 1643312171000
}
],
"generated_dest_index" : {
"mappings" : {
"_meta" : {
"_transform" : {
"transform" : "transform-preview",
"version" : {
"created" : "7.15.1"
},
"creation_date_in_millis" : 1643400080594
},
"created_by" : "transform"
},
"properties" : {
"name" : {
"type" : "keyword"
}
}
},
"settings" : {
"index" : {
"number_of_shards" : "1",
"auto_expand_replicas" : "0-1"
}
},
"aliases" : { }
}
}

剧本在干什么

正如您所看到的,我们正在字段name.keywords上创建一个术语聚合。我们使用了一个脚本化的度量聚合,它有4个步骤:

  • init_script:初始化一个状态,这是一个初始化变量的空间,它们的作用域对于所有碎片都是全局的
  • map_script:这一步为每个文档执行代码,意味着您可以对文档进行迭代或进行复杂计算,就像您使用python或java等高级编程语言进行编码一样(避免进行大量计算,否则会减慢聚合速度(
  • combine_script:这里我们告诉elasticsearch从每个碎片返回状态
  • reduce_script:这是最后一步,我们迭代上一步中每个碎片的结果(也称为combine script(,以计算每个聚合的第一个/最新时间戳

最后,在bucket script中,我们计算给定first_valuelatest_value的差,我们除以1000,因为时间戳字段存储在epoch millis中。time_length单位为秒。

有关脚本化指标聚合的更多信息:脚本化指标。

最新更新