有人在使用 AWS kinesis 流、lambda 和 firehose 时遇到数据丢失的情况吗?



我目前正在向 aws kinesis 流发送一系列 xml 消息,我一直在不同的项目中使用它,所以我非常有信心这个位有效。然后我写了一个lambda来处理从kinesis流到kinesis firehose的事件:

import os
import boto3
import base64
firehose = boto3.client('firehose')

def lambda_handler(event, context):
deliveryStreamName = os.environ['FIREHOSE_STREAM_NAME']
# Send record directly to firehose
for record in event['Records']:
data = record['kinesis']['data']
response = firehose.put_record(
DeliveryStreamName=deliveryStreamName,
Record={'Data': data}
)
print(response)

我已将 kinesis 流设置为 lamdba 触发器,并将批大小设置为 1,起始位置最新。

对于运动消防水带,我有以下配置:

Data transformation*: Disabled
Source record backup*: Disabled
S3 buffer size (MB)*: 10
S3 buffer interval (sec)*: 60
S3 Compression: UNCOMPRESSED
S3 Encryption: No Encryption
Status: ACTIVE
Error logging: Enabled

我发送了 162 个事件,我从 s3 读取了它们,我设法获得的最多是 160 个,通常更少。我什至尝试等待几个小时,以防重试发生奇怪的事情。

有人有使用Kinesis->lamdba->消防水带的经验,并看到数据丢失的问题吗?

从我在这里看到的情况来看,当您将数据发布到 Kinesis Stream(而不是 FireHose(时,很可能的项目会丢失。

由于您在写入 FireHose 时使用的是put_record,因此它将引发异常,在这种情况下将重试 lambda。(检查该级别是否存在故障是有意义的(。

因此,考虑到我可能会认为记录在到达 Kinesis 流之前就丢失了。 如果您使用put_records方法将项目发送到 Kinesis 流,这并不能保证所有记录都将发送到流(由于超出写入吞吐量或内部错误(,某些记录可能无法发送。在这种情况下,失败的记录子集应该由您的代码重新发送(这是 Java 示例,抱歉我找不到 Python 示例(。

最新更新