我正在做Glue ETL处理,它基本上做以下操作-
- 从S3读取文件(通过Glue Catalog)
- 传输数据(添加/删除列)
- 将数据写入RDS postgre表(也通过Glue Catalog)
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'SRC_DB', 'SRC_TABLE', 'TGT_DB', 'TGT_TABLE'])
DataSource0 = glueContext.create_dynamic_frame.from_catalog(database = args['SRC_DB'], table_name = args['SRC_TABLE'], transformation_ctx = "DataSource0")
Transform0 = sparkSqlQuery(glueContext, query = SqlQuery0, mapping = {"sparkDataSource": DataSource0}, transformation_ctx = "Transform0")
DataSink0 = glueContext.write_dynamic_frame.from_catalog(frame = Transform0, database = args['TGT_DB'], table_name = args['TGT_TABLE'], transformation_ctx = "DataSink0")
我想在这里实现的是过滤掉不良记录(例如,如果数据值长度大于数据目录或RDS表中定义的长度,则在任何列的任何记录中),并将这些记录插入到其他表或S3文件中,并继续处理,没有例外。这样我就可以将不良记录报告给源团队。
这里发生的事情是,如果转换后有任何坏数据(列数据类型或长度不匹配),则粘合ETL作业将因批处理异常而中止。
您可以按照以下方法解决这个问题:
-
使用Glue图。应用UDF,您可以在其中传递动态记录并计算新列的长度。完成后,您可以返回带有附加记录的动态帧。请参阅此链接以获取UDF示例。
d["date"] = datetime.today() return d datasource1 = Map.apply(frame = datasource0, f = addDate)```
-
现在使用Glue
splitrows
转换通过将长度列传递给comparison_dict来实现这一点。下面是一个带有条件的示例,并根据条件返回两个动态帧作为集合。
dyf_splitRows = SplitRows.apply(frame = dyf_dropNullfields, comparison_dict = {"`data.pineapple`": {">": "100",
"<": "200"}}, name1 = 'pa_200_less', name2 = 'pa_200_more')