我们正在尝试使用lambda函数在aws-dynamodb上使用kineesis数据生成器上传实时数据。我们的代码在运行测试用例时工作良好,但在生产环境中无法上传数据。下面是我们的lambda函数:
import boto3
import json
import base64
def lambda_handler(event, context):
dynamo_db = boto3.resource('dynamodb')
table = dynamo_db.Table('dyno-table')
try:
data = [record.get('kinesis').get('data') for record in event['Records']]
for record in data:
recordeddata = base64.b64decode(record).decode("utf-8")
decoded_data_dic = json.loads(recordeddata)
with table.batch_writer() as batch_writer:
batch_writer.put_item(Item=decoded_data_dic)
return data
except Exception as e:
return str(e)
这是我们的json数据的样本从kinesis发送。
{
"Records": [
{
"kinesis": {
"kinesisSchemaVersion": "1.0",
"partitionKey": "4159727726",
"sequenceNumber": "49639282004144167688658152268735854270994120426068639794",
"data": "eyJmaXJzdE5hbWUiOiJBbnRvbmV0dGUiLCJsYXN0TmFtZSI6IlRoaWVsIiwiaWQiOjYsImlwIjoiMTAwLjIyNy4zOC4yMTMiLH0K",
"approximateArrivalTimestamp": 1680243715.662
},
"eventSource": "aws:kinesis",
"eventVersion": "1.0",
"eventID": "shardId-000000000003:49639282004144167688658152268735854270994120426068639794",
"eventName": "aws:kinesis:record",
"invokeIdentityArn": "arn:aws:iam::777050133147:role/AvaniK-LambdaRole",
"awsRegion": "ap-northeast-1",
"eventSourceARN": "arn:aws:kinesis:ap-northeast-1:777050133147:stream/DeepakR-datastream"
}
]
}
数据模式:
{
"firstName":"{{name.firstName}}",
"lastName":"{{name.lastName}}",
"id":{{random.number(70)}},
"ip":"{{internet.ip}}",
}
您的问题是由于您在Kinesis base64编码的有效载荷中发送格式错误的JSON:
{"firstName":"Antonette","lastName":"Thiel","id":6,"ip":"100.227.38.213",}
这会导致json.loads()
函数失败:
Expecting property name enclosed in double quotes: line 1 column 74 (char 73)
你需要提供有效的JSON来工作。