使用AWS Lambda读取Kafka(MSK)事件源



我正在尝试使用AWS lambda读取kafka主题(AWS MSK(中的值。

从lambda打印的事件记录如下所示:

{eventSource':'aws:kafka','eventSourceArn':'arn:aws:kafka:ap-northest-1:987654321:cluster/mskluster/79y80c66-813a-4f-af0e-4ea47ba107e6','records':{'事务-0:[{'topic':'Transactions','partition':0,'offset':4798,'timestamp':1603565835915,'timetampType':'CREATE_TIME','value':'eyJFdmVudFRpbWUiOiAiMjAyMC0xMC0yNCAxODo1NzoxNS45MTUzMjQiLCAiSVAiOiMTgwLjI0MS4xNTkuMjE4IiwgIkFjY291bnROdW1iZXIiIiiiIiMTQ2ODA4ODYiLCAiVXNlck5hbWUiOi67iQW1iZXIgUm9tYXJvIiwgIkFtb3VudCI6ICI1NTYIwgIlRyYW5zYWN0aW9uSUQiOiAiTzI4Qlg3TlBJbWZmSXExWCIsICJDb3VuTHJ5IjogIk9tYW4ifQ=='}]}}

如何提取"主题"one_answers"值"字段?值1是base64编码的。我得到以下错误:

名称错误:名称"记录"未定义

我正在尝试以下代码:

import json
import base64
def lambda_handler(event, context):
print(event)
message = event['records']
payload=base64.b64decode(record["message"]["value"])
print("Decoded payload: " + str(payload))

示例MSK事件结构

在代码片段中,试图传递给解码函数的record变量不存在。迭代记录的一个例子是:

records = event['records']['Transactions-0']
for record in records:
payload=base64.b64decode(record["message"]["value"])
print("Decoded payload: " + str(payload))

每个函数调用每个主题都包含多个记录。虽然如果你有多个像Transactions-1这样的,你也可以迭代这些,。。。

最新更新