我正在尝试使用AWS lambda读取kafka主题(AWS MSK(中的值。
从lambda打印的事件记录如下所示:
{eventSource':'aws:kafka','eventSourceArn':'arn:aws:kafka:ap-northest-1:987654321:cluster/mskluster/79y80c66-813a-4f-af0e-4ea47ba107e6','records':{'事务-0:[{'topic':'Transactions','partition':0,'offset':4798,'timestamp':1603565835915,'timetampType':'CREATE_TIME','value':'eyJFdmVudFRpbWUiOiAiMjAyMC0xMC0yNCAxODo1NzoxNS45MTUzMjQiLCAiSVAiOiMTgwLjI0MS4xNTkuMjE4IiwgIkFjY291bnROdW1iZXIiIiiiIiMTQ2ODA4ODYiLCAiVXNlck5hbWUiOi67iQW1iZXIgUm9tYXJvIiwgIkFtb3VudCI6ICI1NTYIwgIlRyYW5zYWN0aW9uSUQiOiAiTzI4Qlg3TlBJbWZmSXExWCIsICJDb3VuTHJ5IjogIk9tYW4ifQ=='}]}}
如何提取"主题"one_answers"值"字段?值1是base64编码的。我得到以下错误:
名称错误:名称"记录"未定义
我正在尝试以下代码:
import json
import base64
def lambda_handler(event, context):
print(event)
message = event['records']
payload=base64.b64decode(record["message"]["value"])
print("Decoded payload: " + str(payload))
示例MSK事件结构
在代码片段中,试图传递给解码函数的record
变量不存在。迭代记录的一个例子是:
records = event['records']['Transactions-0']
for record in records:
payload=base64.b64decode(record["message"]["value"])
print("Decoded payload: " + str(payload))
每个函数调用每个主题都包含多个记录。虽然如果你有多个像Transactions-1
这样的,你也可以迭代这些,。。。