根据Glue作业状态将文件传输到S3存储桶中


I am new to **AWS Glue,** and my aim is to extract  transform and load files uploaded in S3 bucket to RDS instance. Also I need to transfer the files into separate S3 buckets based on the Glue Job status (Success /Failure). There will be more than one file uploaded into the initial S3 bucket. How can I get the name of the files uploaded so that i can transfer those files to appropriate buckets.

步骤1:将文件上传到S3 bucket1。步骤2:触发lamda函数调用Job1步骤3:作业1成功将文件传输到S3桶2步骤4:故障转移到另一个S3桶

让lambda事件触发器侦听正在上载的文件夹在lambda中,使用AWS Glue API运行粘合作业(本质上是AWS Glue中的python脚本(。

在Glue python脚本中,使用适当的库,如pymysql等。作为一个与python脚本打包的外部库。

执行从S3到RDS表的数据加载操作。如果你是使用Aurora Mysql,AWS提供了一个不错的功能"从S3加载",因此您可以直接加载文件到表中(您可能需要在参数组/IAM角色(。

调用粘合作业的Lambda脚本:

s3 = boto3.client('s3')
glue = boto3.client('glue')
def lambda_handler(event, context):
gluejobname="<YOUR GLUE JOB NAME>"
try:
runId = glue.start_job_run(JobName=gluejobname)
status = glue.get_job_run(JobName=gluejobname, RunId=runId['JobRunId'])
print("Job Status : ", status['JobRun']['JobRunState'])
except Exception as e:
print(e)
raise e

粘合脚本:

import mysql.connector
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.context import DynamicFrame
from awsglue.transforms import *
from pyspark.sql.types import StringType
from pyspark.sql.types import DateType
from pyspark.sql.functions import unix_timestamp, from_unixtime
from pyspark.sql import SQLContext
# Create a Glue context
glueContext = GlueContext(SparkContext.getOrCreate())
url="<RDS URL>"
uname="<USER NAME>"
pwd="<PASSWORD>"
dbase="DBNAME"

def connect():
conn = mysql.connector.connect(host=url, user=uname, password=pwd, database=dbase)
cur = conn.cursor()
return cur, conn
def create_stg_table():
cur, conn = connect()
createStgTable1 = <CREATE STAGING TABLE IF REQUIRED>
loadQry = "LOAD DATA FROM S3 PREFIX 'S3://PATH FOR YOUR CSV' REPLACE INTO TABLE <DB.TABLENAME> FIELDS TERMINATED BY '|' LINES TERMINATED BY 'n' IGNORE 1 LINES (@var1, @var2, @var3, @var4, @var5, @var6, @var7, @var8) SET ......;"
cur.execute(createStgTable1)
cur.execute(loadQry)
conn.commit()
conn.close()

然后,您可以创建一个cloudwatch警报,其中检查粘合作业的状态,并根据状态在S3之间执行文件复制操作。我们的生产中也有类似的设置。

问候

Yuva

相关内容

最新更新