如何将转换后的日志记录索引到AWS Elasticsearch中



TLDR

由于"编码问题",lambda函数无法将消防软管日志索引到AWS管理的ES中。

实际错误响应

当我从消防软管record对单个logEvent进行base64编码并将收集的记录发送到AWS管理的ES.时,我没有得到任何错误

请参阅下一节了解更多详细信息。

基本64编码的压缩有效载荷被发送到ES,因为生成的json转换太大,ES无法索引-请参阅此ES链接。

我从AWS管理的ES中得到以下错误:

{
"deliveryStreamARN": "arn:aws:firehose:us-west-2:*:deliverystream/*",
"destination": "arn:aws:es:us-west-2:*:domain/*",
"deliveryStreamVersionId": 1,
"message": "The data could not be decoded as UTF-8",
"errorCode": "InvalidEncodingException",
"processor": "arn:aws:lambda:us-west-2:*:function:*"
}

如果输出记录未压缩,则为the body size is too long(小到14MB)。在没有压缩和简单的base64编码有效载荷的情况下,我在Lambda日志中得到以下错误:

{
"type": "mapper_parsing_exception",
"reason": "failed to parse",
"caused_by": {
"type": "not_x_content_exception",
"reason": "Compressor detection can only be called on some xcontent bytes or compressed xcontent bytes"
}
}

说明

我有Cloudwatch日志,这些日志按大小/间隔进行缓冲,并输入Kinesis Firehose。firehose将日志传输到lambda函数中,该函数将日志转换为json记录,然后将其发送到AWS管理的Elasticsearch集群。

lambda函数获得以下JSON结构:

{
"invocationId": "cf1306b5-2d3c-4886-b7be-b5bcf0a66ef3",
"deliveryStreamArn": "arn:aws:firehose:...",
"region": "us-west-2",
"records": [{
"recordId": "49577998431243709525183749876652374166077260049460232194000000",
"approximateArrivalTimestamp": 1508197563377,
"data": "some_compressed_data_in_base_64_encoding"
}]
}

lambda函数然后提取.records[].data,并将数据解码为base64,并对数据进行解压缩,从而生成以下JSON:

{
"messageType": "DATA_MESSAGE",
"owner": "aws_account_number",
"logGroup": "some_cloudwatch_log_group_name",
"logStream": "i-0221b6ec01af47bfb",
"subscriptionFilters": [
"cloudwatch_log_subscription_filter_name"
],
"logEvents": [
{
"id": "33633929427703365813575134502195362621356131219229245440",
"timestamp": 1508197557000,
"message": "Oct 16 23:45:57 some_log_entry_1"
},
{
"id": "33633929427703365813575134502195362621356131219229245441",
"timestamp": 1508197557000,
"message": "Oct 16 23:45:57 some_log_entry_2"
},
{
"id": "33633929427703365813575134502195362621356131219229245442",
"timestamp": 1508197557000,
"message": "Oct 16 23:45:57 some_log_entry_3"
}
]
}

.logEvents[]中的单个项目被转换为json结构,其中键是在Kibana中搜索日志时所需的列,类似于以下内容:

{
'journalctl_host': 'ip-172-11-11-111',
'process': 'haproxy',
'pid': 15507,
'client_ip': '172.11.11.111',
'client_port': 3924,
'frontend_name': 'http-web',
'backend_name': 'server',
'server_name': 'server-3',
'time_duration': 10,
'status_code': 200,
'bytes_read': 79,
'@timestamp': '1900-10-16T23:46:01.0Z',
'tags': ['haproxy'],
'message': 'HEAD / HTTP/1.1'
}

转换后的json被收集到一个数组中,该数组得到zlib压缩和base64编码的字符串,然后将其转换为新的json有效载荷,作为最终的lambda结果:

{
"records": [
{
"recordId": "49577998431243709525183749876652374166077260049460232194000000",
"result": "Ok",
"data": "base64_encoded_zlib_compressed_array_of_transformed_logs"
}
]}

Cloudwatch配置

13个日志条目(~4kb)可以转换到大约635kb。

我还降低了awslogs的阈值,希望发送到Lambda函数的日志大小变小:

buffer_duration = 10
batch_count = 10
batch_size = 500

不幸的是,当出现突发时,峰值可能超过2800行,其中大小超过1MB。

当lambda函数产生的有效负载"太大"(约13mb的转换日志)时,lambda cloudwatch日志中会记录一个错误-"主体大小太长"。似乎没有任何迹象表明这个错误来自哪里,也没有迹象表明lambda-fn的响应有效载荷是否有大小限制。

因此,AWS支持人员告诉我,无法减轻以下限制来解决此流程:

  1. lambda有效负载大小
  2. 进入lambda的压缩消防软管有效载荷与lambda输出成正比

相反,我将架构修改为:

  1. Cloudwatch日志通过Firehose在S3中备份
  2. S3事件由lambda函数处理
  3. 如果lambda转换并能够成功地将日志大容量索引到ES中,则lambda函数将返回一个成功代码
  4. 如果lambda函数失败,则为死信队列(AWS SQS)配置cloudwatch警报。可以在这里找到一个示例cloudformation片段
  5. 如果存在SQS消息,可以用这些消息手动调用lambda函数,或者设置AWS批处理作业来用lambda函数处理SQS消息。但是,需要注意的是,lambda函数不会再次故障转移到DLQ中。检查lambdacloudwatch日志以检查为什么未处理该消息并将其发送到DLQ

最新更新