Cloudformation aws glue内联命令



我的目标是通过cloudformation创建一个粘合作业。我要处理的问题是Command属性似乎不支持内联代码(就像cloudformation lambaCode属性一样(
我的问题是,有没有一种方法可以单独使用cloudformation模板创建一个功能齐全的粘合作业,有没有办法提前上传命令文件(并指定ScriptLocation(?

类似的东西可能会起作用(未经测试,因此您可能需要进行调整(。基本上,您创建了一个自定义资源来方便将粘合代码上传到S3,然后引用该自定义资源来获取位置。下面是CloudFormation模板,下面是可以用来动态生成模板的Rubycfn代码。

{
"AWSTemplateFormatVersion": "2010-09-09",
"Description": "Glue Job Example Stack",
"Resources": {
"GlueCode": {
"Properties": {
"BucketName": "my-amazing-bucket-for-glue-jobs",
"Content": {
"Fn::Join": [
"n",
[
"# some job code",
"# for glue"
]
]
},
"CreateMode": "plain-literal",
"FileName": "gluecode.sh",
"ServiceToken": {
"Fn::GetAtt": [
"InlineS3UploadFunction",
"Arn"
]
}
},
"Type": "Custom::InlineUpload"
},
"GlueJob": {
"Properties": {
"Command": {
"Name": "myglueetl",
"ScriptLocation": {
"Fn::Join": [
"",
[
"s3://my-amazing-bucket-for-glue-jobs/",
{
"Ref": "GlueCode"
}
]
]
}
},
"DefaultArguments": {
"--job-bookmark-option": "job-bookmark-enable"
},
"ExecutionProperty": {
"MaxConcurrentRuns": 2
},
"MaxRetries": 0,
"Name": "glue-job-1",
"Role": {
"Ref": "SomeRole"
}
},
"Type": "AWS::Glue::Job"
},
"InlineS3UploadFunction": {
"Properties": {
"Code": {
"ZipFile": {
"Fn::Join": [
"n",
[
"import boto3",
"import cfnresponse",
"import hashlib",
"import json",
"import logging",
"import signal",
"import zipfile",
"",
"from urllib2 import build_opener, HTTPHandler, Request",
"",
"LOGGER = logging.getLogger()",
"LOGGER.setLevel(logging.INFO)",
"",
"def lambda_handler(event, context):",
"    # Setup alarm for remaining runtime minus a second",
"    try:",
"        signal.alarm((context.get_remaining_time_in_millis() / 1000) - 1)",
"        LOGGER.info('REQUEST RECEIVED: %s', event)",
"        LOGGER.info('REQUEST RECEIVED: %s', context)",
"        if event['RequestType'] == 'Create' or event['RequestType'] == 'Update':",
"            LOGGER.info('Creating or updating S3 Object')",
"            bucket_name = event['ResourceProperties']['BucketName']",
"            file_name = event['ResourceProperties']['FileName']",
"            content = event['ResourceProperties']['Content']",
"            create_zip = True if event['ResourceProperties']['CreateMode'] == 'zip' else False",
"            literal = True if event['ResourceProperties']['CreateMode'] == 'plain-literal' else False",
"            md5_hash = hashlib.md5(content).hexdigest()",
"            with open('/tmp/' + file_name, 'w') as lambda_file:",
"                lambda_file.write(content)",
"                lambda_file.close()",
"                s3 = boto3.resource('s3')",
"                if create_zip == True:",
"                    output_filename = file_name + '_' + md5_hash + '.zip'",
"                    zf = zipfile.ZipFile('/tmp/' + output_filename, mode='w')",
"                    try:",
"                        zf.write('/tmp/' + file_name, file_name)",
"                    finally:",
"                        zf.close()",
"                        data = open('/tmp/' + output_filename, 'rb')",
"                        s3.Bucket(bucket_name).put_object(Key=output_filename, Body=data)",
"                else:",
"                    if literal == True:",
"                        data = open('/tmp/' + file_name, 'rb')",
"                        s3.Bucket(bucket_name).put_object(Key=file_name, Body=content)",
"                    else:",
"                        extension = file_name.split(".")[-1]",
"                        output_filename = ".".join(file_name.split(".")[:-1]) + '_' + md5_hash + '.' + extension",
"                        data = open('/tmp/' + file_name, 'rb')",
"                        s3.Bucket(bucket_name).put_object(Key=output_filename, Body=content)",
"            cfnresponse.send(event, context, cfnresponse.SUCCESS, { 'Message': output_filename } )",
"        elif event['RequestType'] == 'Delete':",
"            LOGGER.info('DELETE!')",
"            cfnresponse.send(event, context, cfnresponse.SUCCESS, { 'Message': 'Resource deletion successful!'} )",
"        else:",
"            LOGGER.info('FAILED!')",
"            cfnresponse.send(event, context, cfnresponse.SUCCESS, { 'Message': 'There is no such success like failure.'} )",
"    except Exception as e: #pylint: disable=W0702",
"        LOGGER.info(e)",
"        cfnresponse.send(event, context, cfnresponse.SUCCESS, { 'Message': 'There is no such success like failure.' } )",
"",
"def timeout_handler(_signal, _frame):",
"    '''Handle SIGALRM'''",
"    LOGGER.info('Time exceeded')",
"    raise Exception('Time exceeded')",
"",
"signal.signal(signal.SIGALRM, timeout_handler)"
]
]
}
},
"Handler": "index.lambda_handler",
"Role": {
"Fn::GetAtt": [
"LambdaExecutionRole",
"Arn"
]
},
"Runtime": "python2.7",
"Timeout": "30"
},
"Type": "AWS::Lambda::Function"
},
"LambdaExecutionRole": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": [
"sts:AssumeRole"
],
"Effect": "Allow",
"Principal": {
"Service": [
"lambda.amazonaws.com"
]
}
}
],
"Version": "2012-10-17"
},
"Path": "/",
"Policies": [
{
"PolicyDocument": {
"Statement": [
{
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Effect": "Allow",
"Resource": "arn:aws:logs:*:*:*"
},
{
"Action": "s3:*",
"Effect": "Allow",
"Resource": "arn:aws:s3:::*"
}
],
"Version": "2012-10-17"
},
"PolicyName": "root"
}
]
},
"Type": "AWS::IAM::Role"
}
}
}

Rubycfn代码如下。使用https://rubycfn.com/CloudFormation编译器或Rubycfn gemgem install rubycfn编译:

ENV["GLUEBUCKET"] ||= "my-amazing-bucket-for-glue-jobs"
description "Glue Job Example Stack"
resource :glue_code,
type: "Custom::InlineUpload" do |r|
r.property(:service_token) { :inline_s3_upload_function.ref(:arn) }
r.property(:bucket_name) { ENV["GLUEBUCKET"] }
r.property(:file_name) { "gluecode.sh" }
r.property(:create_mode) { "plain-literal" }
r.property(:content) do
[
"# some job code",
"# for glue"
].fnjoin("n")
end
end
resource :glue_job,
type: "AWS::Glue::Job" do |r|
r.property(:command) do
{
"Name": "myglueetl",
"ScriptLocation": ["s3://#{ENV["GLUEBUCKET"]}/", :glue_code.ref].fnjoin
}
end
r.property(:default_arguments) do
{
"--job-bookmark-option": "job-bookmark-enable"
}
end
r.property(:execution_property) do
{
"MaxConcurrentRuns": 2
}
end
r.property(:max_retries) { 0 }
r.property(:name) { "glue-job-1" }
r.property(:role) { :some_role.ref }
end
resource :inline_s3_upload_function,
type: "AWS::Lambda::Function" do |r|
r.property(:code) do
{
"ZipFile": [
"import boto3",
"import cfnresponse",
"import hashlib",
"import json",
"import logging",
"import signal",
"import zipfile",
"",
"from urllib2 import build_opener, HTTPHandler, Request",
"",
"LOGGER = logging.getLogger()",
"LOGGER.setLevel(logging.INFO)",
"",
"def lambda_handler(event, context):",
"    # Setup alarm for remaining runtime minus a second",
"    try:",
"        signal.alarm((context.get_remaining_time_in_millis() / 1000) - 1)",
"        LOGGER.info('REQUEST RECEIVED: %s', event)",
"        LOGGER.info('REQUEST RECEIVED: %s', context)",
"        if event['RequestType'] == 'Create' or event['RequestType'] == 'Update':",
"            LOGGER.info('Creating or updating S3 Object')",
"            bucket_name = event['ResourceProperties']['BucketName']",
"            file_name = event['ResourceProperties']['FileName']",
"            content = event['ResourceProperties']['Content']",
"            create_zip = True if event['ResourceProperties']['CreateMode'] == 'zip' else False",
"            literal = True if event['ResourceProperties']['CreateMode'] == 'plain-literal' else False",
"            md5_hash = hashlib.md5(content).hexdigest()",
"            with open('/tmp/' + file_name, 'w') as lambda_file:",
"                lambda_file.write(content)",
"                lambda_file.close()",
"                s3 = boto3.resource('s3')",
"                if create_zip == True:",
"                    output_filename = file_name + '_' + md5_hash + '.zip'",
"                    zf = zipfile.ZipFile('/tmp/' + output_filename, mode='w')",
"                    try:",
"                        zf.write('/tmp/' + file_name, file_name)",
"                    finally:",
"                        zf.close()",
"                        data = open('/tmp/' + output_filename, 'rb')",
"                        s3.Bucket(bucket_name).put_object(Key=output_filename, Body=data)",
"                else:",
"                    if literal == True:",
"                        data = open('/tmp/' + file_name, 'rb')",
"                        s3.Bucket(bucket_name).put_object(Key=file_name, Body=content)",
"                    else:",
"                        extension = file_name.split(".")[-1]",
"                        output_filename = ".".join(file_name.split(".")[:-1]) + '_' + md5_hash + '.' + extension",
"                        data = open('/tmp/' + file_name, 'rb')",
"                        s3.Bucket(bucket_name).put_object(Key=output_filename, Body=content)",
"            cfnresponse.send(event, context, cfnresponse.SUCCESS, { 'Message': output_filename } )",
"        elif event['RequestType'] == 'Delete':",
"            LOGGER.info('DELETE!')",
"            cfnresponse.send(event, context, cfnresponse.SUCCESS, { 'Message': 'Resource deletion successful!'} )", 
"        else:",
"            LOGGER.info('FAILED!')",
"            cfnresponse.send(event, context, cfnresponse.SUCCESS, { 'Message': 'There is no such success like failure.'} )",
"    except Exception as e: #pylint: disable=W0702",
"        LOGGER.info(e)",
"        cfnresponse.send(event, context, cfnresponse.SUCCESS, { 'Message': 'There is no such success like failure.' } )",
"",
"def timeout_handler(_signal, _frame):",
"    '''Handle SIGALRM'''",
"    LOGGER.info('Time exceeded')",
"    raise Exception('Time exceeded')",
"",
"signal.signal(signal.SIGALRM, timeout_handler)"
].fnjoin("n")
}
end
r.property(:handler) { "index.lambda_handler" }
r.property(:role) { :lambda_execution_role.ref(:arn) }
r.property(:runtime) { "python2.7" }
r.property(:timeout) { "30" }
end
resource :lambda_execution_role,
type: "AWS::IAM::Role" do |r|
r.property(:assume_role_policy_document) do
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": [
"lambda.amazonaws.com"
]
},
"Action": [
"sts:AssumeRole"
]
}
]
}
end
r.property(:path) { "/" }
r.property(:policies) do
[
{
"PolicyName": "root",
"PolicyDocument": {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": %w(logs:CreateLogGroup logs:CreateLogStream logs:PutLogEvents),
"Resource": "arn:aws:logs:*:*:*"
},
{
"Effect": "Allow",
"Action": "s3:*",
"Resource": "arn:aws:s3:::*"
}
]
}
}
]
end
end

最新更新