ElasticSearch映射对分组文档执行折叠/执行操作的结果



有一个对话列表,每个对话都有一个消息列表。每条消息都有不同的字段和一个action字段。我们需要考虑的是,在对话的第一条消息中,使用了动作A,在几条消息之后,使用了行动A.1,过了一段时间后使用了A.1.1,依此类推(有一个聊天机器人意图列表(。

对对话的消息操作进行分组将类似于:A > A > A > A.1 > A > A.1 > A.1.1 ...

问题:

我需要使用ElasticSearch创建一个报告,该报告将返回每个会话的actions group;接下来,我需要对类似的actions groups进行分组并添加计数;最终将产生CCD_ 8作为CCD_。

构造CCD_ 10需要消除每一组重复;我需要A > A.1 > A > A.1 > A.1.1而不是A > A > A > A.1 > A > A.1 > A.1.1

我开始执行的步骤

{
"collapse":{
"field":"context.conversationId",
"inner_hits":{
"name":"logs",
"size": 10000,
"sort":[
{
"@timestamp":"asc"
}
]
}
},
"aggs":{
},
}

我接下来需要什么:

  1. 我需要将崩溃的结果映射到像A > A.1 > A > A.1 > A.1.1这样的单个结果中。我已经看到,在这种情况下,或者aggr可以对结果使用脚本,也可以创建一个像我需要的那样的操作列表,但aggr对所有消息执行操作,而不仅仅是对我在崩溃中的分组消息执行操作。有可能在塌陷或类似的解决方案中使用aggr
  2. 我需要对所有折叠的结果值(A > A.1 > A > A.1 > A.1.1(进行分组,添加一个计数并得到Map<actionsGroup, count>

或者:

  1. 使用aggrconversationId字段对对话消息进行分组(我不知道该怎么做(
  2. 使用脚本迭代所有值,并为每个会话创建actions group。(不确定这是否可能(
  3. 对所有值使用另一个aggr,并对重复项进行分组,返回Map<actionsGroup, count>

映射:

"mappings":{
"properties":{
"@timestamp":{
"type":"date",
"format": "epoch_millis"
}
"context":{
"properties":{
"action":{
"type":"keyword"
},
"conversationId":{
"type":"keyword"
}
}
}
}
}

对话样本文件:

Conversation 1.
{
"@timestamp": 1579632745000,
"context": {
"action": "A",
"conversationId": "conv_id1",
}
},
{
"@timestamp": 1579632745001,
"context": {
"action": "A.1",
"conversationId": "conv_id1",
}
},
{
"@timestamp": 1579632745002,
"context": {
"action": "A.1.1",
"conversationId": "conv_id1",
}
}
Conversation 2.
{
"@timestamp": 1579632745000,
"context": {
"action": "A",
"conversationId": "conv_id2",
}
},
{
"@timestamp": 1579632745001,
"context": {
"action": "A.1",
"conversationId": "conv_id2",
}
},
{
"@timestamp": 1579632745002,
"context": {
"action": "A.1.1",
"conversationId": "conv_id2",
}
}
Conversation 3.
{
"@timestamp": 1579632745000,
"context": {
"action": "B",
"conversationId": "conv_id3",
}
},
{
"@timestamp": 1579632745001,
"context": {
"action": "B.1",
"conversationId": "conv_id3",
}
}

预期结果:

{
"A -> A.1 -> A.1.1": 2,
"B -> B.1": 1
}
Something similar, having this or any other format.

我使用弹性的scripted_metric解决了它。此外,index从初始状态改变。

脚本:

{
"size": 0,
"aggs": {
"intentPathsCountAgg": {
"scripted_metric": {
"init_script": "state.messagesList = new ArrayList();",
"map_script": "long currentMessageTime = doc['messageReceivedEvent.context.timestamp'].value.millis; Map currentMessage = ['conversationId': doc['messageReceivedEvent.context.conversationId.keyword'], 'time': currentMessageTime, 'intentsPath': doc['brainQueryRequestEvent.brainQueryRequest.user_data.intentsHistoryPath.keyword'].value]; state.messagesList.add(currentMessage);",  
"combine_script": "return state",
"reduce_script": "List messages = new ArrayList(); Map conversationsMap = new HashMap(); Map intentsMap = new HashMap(); String[] ifElseWorkaround = new String[1]; for (state in states) { messages.addAll(state.messagesList);} messages.stream().forEach((message) -> { Map existingMessage = conversationsMap.get(message.conversationId); if(existingMessage == null || message.time > existingMessage.time) { conversationsMap.put(message.conversationId, ['time': message.time, 'intentsPath': message.intentsPath]); } else { ifElseWorkaround[0] = ''; } }); conversationsMap.entrySet().forEach(conversation -> { if (intentsMap.containsKey(conversation.getValue().intentsPath)) { long intentsCount = intentsMap.get(conversation.getValue().intentsPath) + 1; intentsMap.put(conversation.getValue().intentsPath, intentsCount); } else {intentsMap.put(conversation.getValue().intentsPath, 1L);} }); return intentsMap.entrySet().stream().map(intentPath -> [intentPath.getKey().toString(): intentPath.getValue()]).collect(Collectors.toSet()) "
}
}
}
}

格式化脚本(为了更好的可读性-使用.ts(:

scripted_metric: {
init_script: 'state.messagesList = new ArrayList();',
map_script: `
long currentMessageTime = doc['messageReceivedEvent.context.timestamp'].value.millis;
Map currentMessage = [
'conversationId': doc['messageReceivedEvent.context.conversationId.keyword'],
'time': currentMessageTime,
'intentsPath': doc['brainQueryRequestEvent.brainQueryRequest.user_data.intentsHistoryPath.keyword'].value
];
state.messagesList.add(currentMessage);`,
combine_script: 'return state',
reduce_script: `
List messages = new ArrayList();
Map conversationsMap = new HashMap();
Map intentsMap = new HashMap();
boolean[] ifElseWorkaround = new boolean[1];
for (state in states) {
messages.addAll(state.messagesList);
}
messages.stream().forEach(message -> {
Map existingMessage = conversationsMap.get(message.conversationId);
if(existingMessage == null || message.time > existingMessage.time) {
conversationsMap.put(message.conversationId, ['time': message.time, 'intentsPath': message.intentsPath]);
} else {
ifElseWorkaround[0] = true;
}
});
conversationsMap.entrySet().forEach(conversation -> {
if (intentsMap.containsKey(conversation.getValue().intentsPath)) {
long intentsCount = intentsMap.get(conversation.getValue().intentsPath) + 1;
intentsMap.put(conversation.getValue().intentsPath, intentsCount);
} else {
intentsMap.put(conversation.getValue().intentsPath, 1L);
}
});
return intentsMap.entrySet().stream().map(intentPath -> [
'path': intentPath.getKey().toString(),
'count': intentPath.getValue()
]).collect(Collectors.toSet())`

答案:

{
"took": 2,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 11,
"relation": "eq"
},
"max_score": null,
"hits": []
},
"aggregations": {
"intentPathsCountAgg": {
"value": [
{
"smallTalk.greet -> smallTalk.greet2 -> smallTalk.greet3": 2
},
{
"smallTalk.greet -> smallTalk.greet2 -> smallTalk.greet3  -> smallTalk.greet4": 1
},
{
"smallTalk.greet -> smallTalk.greet2": 1
}
]
}
}
}

使用Terms聚合中的脚本,我们可以在"context.action"的第一个字符上创建bucket我们可以在父bucket ex A->A.1->A.1.1…下获得所有"context.action">

查询:

{
"size": 0,
"aggs": {
"conversations": {
"terms": {
"script": {
"source": "def term=doc['context.action'].value; return term.substring(0,1);" 
--->  returns first character ex A,B,C etc
},
"size": 10
},
"aggs": {
"sub_conversations": {
"terms": {
"script": {
"source": "if(doc['context.action'].value.length()>1) return doc['context.action'];"--> All context.action under [A], length check to ignore [A]
},
"size": 10
}
},
"count": {
"cardinality": {
"script": {
"source": "if(doc['context.action'].value.length()>1) return doc['context.action'];"--> count of all context.action under A
}
}
}
}
}
}
}

由于在弹性搜索中,不可能连接不同的文档。您必须在客户端通过迭代聚合桶来获得组合密钥。

结果:

"aggregations" : {
"conversations" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : "A",
"doc_count" : 6,
"sub_conversations" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : "A.1",
"doc_count" : 2
},
{
"key" : "A.1.1",
"doc_count" : 2
}
]
},
"count" : {
"value" : 2
}
},
{
"key" : "B",
"doc_count" : 2,
"sub_conversations" : {
"doc_count_error_upper_bound" : 0,
"sum_other_doc_count" : 0,
"buckets" : [
{
"key" : "B.1",
"doc_count" : 1
}
]
},
"count" : {
"value" : 1
}
}
]
}
}

最新更新