AWS Lambda函数,密钥管理器无法获取密钥值



我创建了一个lambda函数,可以使用我的电子邮件地址和应用程序密码将账单报告发送到某些电子邮件地址。我已经将secret manager下的应用程序密码保存为其他api。当我尝试使用lambda函数检索它时,它显示错误。

import boto3
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
import ast
import os
import datetime
import base64
import logging
import collections
import json
logger = logging.getLogger()
logger.setLevel(logging.INFO)

def get_secret():
secret_name = "email_app_password"
region_name = "ca-central-1"
# Create a Secrets Manager client
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name=region_name
)
# In this sample we only handle the specific exceptions for the 'GetSecretValue' API.
# See https://docs.aws.amazon.com/secretsmanager/latest/apireference/API_GetSecretValue.html
# We rethrow the exception by default.
try:
get_secret_value_response = client.get_secret_value(
SecretId=secret_name
)
except ClientError as e:
if e.response['Error']['Code'] == 'DecryptionFailureException':
# Secrets Manager can't decrypt the protected secret text using the provided KMS key.
# Deal with the exception here, and/or rethrow at your discretion.
raise e
elif e.response['Error']['Code'] == 'InternalServiceErrorException':
# An error occurred on the server side.
# Deal with the exception here, and/or rethrow at your discretion.
raise e
elif e.response['Error']['Code'] == 'InvalidParameterException':
# You provided an invalid value for a parameter.
# Deal with the exception here, and/or rethrow at your discretion.
raise e
elif e.response['Error']['Code'] == 'InvalidRequestException':
# You provided a parameter value that is not valid for the current state of the resource.
# Deal with the exception here, and/or rethrow at your discretion.
raise e
elif e.response['Error']['Code'] == 'ResourceNotFoundException':
# We can't find the resource that you asked for.
# Deal with the exception here, and/or rethrow at your discretion.
raise e
else:
# Decrypts secret using the associated KMS key.
# Depending on whether the secret is a string or binary, one of these fields will be populated.
if 'SecretString' in get_secret_value_response:
secret = get_secret_value_response['SecretString']
else:
decoded_binary_secret = base64.b64decode(get_secret_value_response['SecretBinary'])

# Your code goes here. 

def send_email(email_body):
"""
Sends email according to environment variables.
:param email_body: Body of email.
:return: None
"""
msg = MIMEMultipart('alternative')
email = os.environ['EMAIL_FROM']
try:
secret_value = get_secret()
print(secret_value[os.environ['SECRET_KEY_NAME']])
except Exception as e:
logger.exception("Exception while trying to get password from Secrets Manager")
return
sec_key = os.environ['SECRET_KEY_NAME']
password = ast.literal_eval(secret_value)[os.environ['SECRET_KEY_NAME']]
msg['Subject'] = os.environ["EMAIL_SUBJECT"]
msg['From'] = email
you = os.environ['EMAIL_TO'].split(',')
msg['To'] = ", ".join(you)
body = email_body
msg.attach(MIMEText(body, 'html'))
try:
smtpObj = smtplib.SMTP('smtp.gmail.com', 587)
smtpObj.starttls()
smtpObj.login(email, password)
smtpObj.sendmail(email, you, msg.as_string())
smtpObj.quit()
logger.info('Email sent')
except smtplib.SMTPException as e:
logger.info("Error: unable to send email due to %s", e)

class FileOpener:
"""
Class to cache file contents.
"""
file_cache = {}
@staticmethod
def open_file(filename):
if filename not in FileOpener.file_cache:
with open(filename) as fp:
FileOpener.file_cache[filename] = fp.read()
return FileOpener.file_cache[filename]

def get_account_cost(account_no, start_day, end_day):
client = boto3.client('ce')
response = client.get_cost_and_usage(
TimePeriod={
'Start': start_day,
'End': end_day
},
Granularity='MONTHLY',
Filter={
"And": [{
"Dimensions": {
"Key": "LINKED_ACCOUNT",
"Values": [account_no]
}
}, {
"Not": {
"Dimensions": {
"Key": "RECORD_TYPE",
"Values": ["Credit", "Refund"]
}
}
}]
},
Metrics=["BlendedCost"],
GroupBy=[
{
'Type': 'DIMENSION',
'Key': 'SERVICE'
},
]
)
cost_dictionary = collections.Counter()
for result in response['ResultsByTime']:
for group in result['Groups']:
servicename = group['Keys'][0]
amount = round(float(group['Metrics']['BlendedCost']['Amount']), 2)
if amount == 0.0:
continue
cost_dictionary[servicename] += amount
return cost_dictionary

def combine_cost_dictionaries(prev_cost_dict, curr_cost_dict):
combined_cost_dict = {}
prev_cost_total = 0.0
curr_cost_total = 0.0
for service, curr_cost in curr_cost_dict.items():
prev_cost = 0.0
if service in prev_cost_dict:
prev_cost = prev_cost_dict[service]
combined_cost_dict[service] = (prev_cost, curr_cost)
prev_cost_total += prev_cost
curr_cost_total += curr_cost
for service, prev_cost in prev_cost_dict.items():
curr_cost = 0.0
if service not in combined_cost_dict:
combined_cost_dict[service] = (prev_cost, curr_cost)
prev_cost_total += prev_cost
curr_cost_total += curr_cost
return combined_cost_dict, prev_cost_total, curr_cost_total

def generate_account_cost_html(account_name, combined_cost_dict, prev_cost_total, curr_cost_total):
prev_date = str((datetime.datetime.now().date() - datetime.timedelta(days=2)))
curr_date = str((datetime.datetime.now().date() - datetime.timedelta(days=1)))
table_rows = ""
sorted_combined_cost = sorted(combined_cost_dict.items(), key=lambda x: x[1][1], reverse=True)
for service, costs in sorted_combined_cost:
table_row = FileOpener.open_file("table_row.html")
prev_cost = round(float(costs[0]), 2)
curr_cost = round(float(costs[1]), 2)
if prev_cost < 0.01:
percentage_change = 'New Charge'
else:
percentage_change = ((curr_cost - prev_cost) / prev_cost) * 100
percentage_change = round(float(percentage_change), 2)
if percentage_change > 0:
percentage_change = "↑ {}%".format(percentage_change)
elif percentage_change == 0.0:
percentage_change = "{}%".format(percentage_change)
else:
percentage_change = "↓ {}%".format(percentage_change)
if percentage_change[0] == '↑':
percentage_css = "background-color: pink;border:darkblue solid thin;"
elif percentage_change == 'New Charge':
percentage_css = "background-color: LightGreen;border:darkblue solid thin;"
else:
percentage_css = "border:lightblue solid thin;"
table_rows += table_row.format(service, prev_cost, curr_cost, percentage_css, percentage_change)
prev_cost_total = round(float(prev_cost_total), 2)
curr_cost_total = round(float(curr_cost_total), 2)
table = FileOpener.open_file("table.html")
table = table.format(account_name, prev_cost_total, curr_cost_total, prev_date, curr_date, table_rows)
return table

def lambda_handler(event, context):
account_names = os.environ['ACCOUNT_NAMES'].split(",")
account_numbers = os.environ['ACCOUNT_NUMBERS'].split(",")
table_body_html = ''
for account_name, account_no in zip(account_names, account_numbers):
day_1 = str((datetime.datetime.now().date() - datetime.timedelta(days=2)))
day_2 = str((datetime.datetime.now().date() - datetime.timedelta(days=1)))
day_3 = str(datetime.datetime.now().date())
prev_cost_dict = get_account_cost(account_no, day_1, day_2)
curr_cost_dict = get_account_cost(account_no, day_2, day_3)
combined_cost_dict, prev_cost_total, curr_cost_total = combine_cost_dictionaries(prev_cost_dict, curr_cost_dict)
table_body = generate_account_cost_html(account_name, combined_cost_dict, prev_cost_total, curr_cost_total)
table_body_html += table_body
email_body = FileOpener.open_file("email_body.html").format(table_body_html)
send_email(email_body)

在运行lambda函数时,我得到以下错误。我甚至试过去掉打印线。然后错误转到password=ast.literal_eval(secret_value([os.environg['secret_KEY_NAME']]

[ERROR] 2022-08-18T06:02:03.968Z    2ae88ceb-39a6-4feb-aa7c-2cbb17ec655c    Exception while trying to get password from Secrets Manager
Traceback (most recent call last):
File "/var/task/lambda_function.py", line 88, in send_email
print(secret_value[os.environ['SECRET_KEY_NAME']])
TypeError: 'NoneType' object is not subscriptableEND RequestId: 2ae88ceb-39a6-4feb-aa7c-2cbb17ec655c

get_secret函数不会将值返回给调用代码。请编辑else子句:

else:
# Decrypts secret using the associated KMS key.
# Depending on whether the secret is a string or binary, one of these fields will be populated.
if 'SecretString' in get_secret_value_response:
return get_secret_value_response['SecretString']
else:
return base64.b64decode(get_secret_value_response['SecretBinary'])

当我这样做的时候,我也陷入了这个非直观的错误:

except ClientError as e:
print(f"exception: {e}")

问题很清楚了,我需要将机密管理器权限添加到执行角色的权限边界中。

我希望这能帮助到别人。

最新更新