我有一个胶水工作,从S3传输数据到Redshift。我希望它能够调度它,以便每次在S3中的数据被重新加载或更新时都运行它。我该怎么做呢?我尝试了代码sol在这里,并做了一个lambda函数:如何触发胶水ETL Pyspark作业通过S3事件或AWS lambda ?
import boto3
print('Loading function')
def lambda_handler(event, context):
source_bucket = event['Records'][0]['s3']['bucket']['name']
s3 = boto3.client('s3')
glue = boto3.client('glue')
gluejobname = "YOUR GLUE JOB NAME"
try:
runId = glue.start_job_run(JobName=gluejobname)
status = glue.get_job_run(JobName=gluejobname, RunId=runId['JobRunId'])
print("Job Status : ", status['JobRun']['JobRunState'])
except Exception as e:
print(e)
print('Error getting object {} from bucket {}. Make sure they exist '
'and your bucket is in the same region as this '
'function.'.format(source_bucket, source_bucket))
raise e
替换作业名称。但是,运行这个命令会得到:
Response
{
"errorMessage": "'Records'",
"errorType": "KeyError",
"stackTrace": [
" File "/var/task/lambda_function.py", line 5, in lambda_handlern source_bucket = event['Records'][0]['s3']['bucket']['name']n"
]
}
Function Logs
START RequestId: 9d063917-958a-494c-8ef9-f1f58e866562 Version: $LATEST
[ERROR] KeyError: 'Records'
Traceback (most recent call last):
File "/var/task/lambda_function.py", line 5, in lambda_handler
source_bucket = event['Records'][0]['s3']['bucket']['name']
END RequestId: 9d063917-958a-494c-8ef9-f1f58e866562
REPORT RequestId: 9d063917-958a-494c-8ef9-f1f58e866562 Duration: 9.41 ms Billed Duration: 10 ms Memory Size: 128 MB Max Memory Used: 65 MB Init Duration: 305.81 ms
Request ID
9d063917-958a-494c-8ef9-f1f58e866562
您不需要更新任何内容,除了第8行中的GLUE JOB NAME。源桶信息从EVENT对象中检索。根据lambda触发器配置将文件上传到s3对象位置,并检查cloudwatch日志。