创建一个新列的AWS ETL作业,该列是现有列的子字符串



我在S3存储桶中有一个数据源。数据源是具有一列的CSV文件;ID";。我想使用AWS Glue来完成ETL工作。我想从S3 bucket中提取数据,创建第二列("ID后缀"(,它是"ID"的最后两个元素;ID";,然后将该数据文件加载到不同的S3桶中。因此,如果";ID";是1000031,我希望第二列是31。

以下是AWS Glue为从一个S3存储桶中提取文件并将其放入另一个存储桶的简单任务创建的脚本。我想编辑它来完成上面的任务。如果你能帮忙,我将不胜感激。谢谢!

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "stackoverflow", table_name = "sample_data_csv", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "stackoverflow", table_name = "sample_data_csv", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("id", "int", "id", "int")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("id", "int", "id", "int")], transformation_ctx = "applymapping1")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://aws-glue-scripts-us-west-1/Sample data"}, format = "csv", transformation_ctx = "datasink2"]
## @return: datasink2
## @inputs: [frame = applymapping1]
datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://aws-glue-scripts-us-west-1/Sample data"}, format = "csv", transformation_ctx = "datasink2")
job.commit()

您可以使用定义了UDF的Map.apply来实现这一点。请参阅我在运行以下脚本后得到的以下输入和输出:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": ["s3://aws-glue-us-east-2/test.csv"]}, format = "csv")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("col0", "int", "id", "int")], transformation_ctx = "applymapping1")
def map_function(dynamicRecord):
sub_id = dynamicRecord["id"][-2:]
dynamicRecord["sub_id"] = sub_id
return dynamicRecord
mapping1 = Map.apply(frame = applymapping1, f = map_function, transformation_ctx = "mapping1")
datasink2 = glueContext.write_dynamic_frame.from_options(frame = mapping1, connection_type = "s3", connection_options = {"path": "s3://aws-glue-us-east-2/Sample_output"}, format = "csv", transformation_ctx = "datasink2")
job.commit()

一旦我运行了这个,我得到了以下输出:

Input
id
1000031
1000032
1000034
1000035
1000036
1000037
1000039
1000030
Output:
sub_id,id
31,1000031
32,1000032
34,1000034
35,1000035
36,1000036
37,1000037
39,1000039
30,1000030

最新更新