AWS Glue 从 ETL 作业加载新分区失败



我正在尝试使用 ETL 作业将数据帧直接写入数据库目录并更新分区。

我有这样的代码:

datasink4 = glueContext.write_dynamic_frame.from_options(
frame = dropnullfields3, 
connection_type = "s3", 
connection_options = {
"path": TARGET_PATH, 
"partitionKeys":["x", "y"]
},
format = "parquet", 
transformation_ctx = "datasink4")
additionalOptions = {"enableUpdateCatalog": True}
additionalOptions["partitionKeys"] = ["x", "y"]

sink = glueContext.write_dynamic_frame_from_catalog(frame=dropnullfields3, 
database=DATABASE, 
table_name=TABLE, 
transformation_ctx="write_sink", 
additional_options=additionalOptions)

它可以将数据写入目录。但是我想避免双重写入。 所以我按照文档中的方法 2 来更新分区: https://docs.aws.amazon.com/glue/latest/dg/update-from-job.html

并附带了以下代码:

datasink4 = glueContext.write_dynamic_frame.from_options(
frame = dropnullfields3, 
connection_type = "s3", 
connection_options = {
"path": TARGET_PATH, 
"partitionKeys":["x", "y"]
},
format = "parquet", 
transformation_ctx = "datasink4")
sink = glueContext.getSink(connection_type="s3", path=TARGET_PATH,
enableUpdateCatalog=True,
partitionKeys=["x", "y"])
sink.setFormat("glueparquet")
sink.setCatalogInfo(catalogDatabase=DATABASE, catalogTableName=TABLE)
sink.writeFrame(dropnullfields3)

但是现在数据无法在雅典娜中加载,我收到有关数据结构的奇怪错误,如下所示:

HIVE_METASTORE_ERROR: com.facebook.presto.spi.PrestoException: Error: < expected at the end of 'struct' (Service: null; Status Code: 0; Error Code: null; Request ID: null)

我尝试重新创建表以仅包含胶镶木地板中的新文件。

我也尝试过在新的胶镶木地板文件上运行爬虫,从爬虫生成的表可以查询。但是,当我从上面的 ETL 作业中填充同一个表时,我总是收到此错误......

要将表的分类更改为胶幕花地板

CREATE EXTERNAL TABLE `table_name`(
...
)
PARTITIONED BY ( 
...
)
ROW FORMAT SERDE 
'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
's3://cortisol-beta-log-bucket/service_log/'
TBLPROPERTIES (
'classification'='glueparquet')

或者在CDK中,您需要设置数据格式,如下所示:

dataFormat: new DataFormat({
inputFormat: InputFormat.PARQUET,
// Have to explicitly specify classification string to allow glue jobs to add partitions
classificationString: new ClassificationString("glueparquet"),
outputFormat: OutputFormat.PARQUET,
serializationLibrary: SerializationLibrary.PARQUET
}),

然后你可以使用下面的代码,它将与雅典娜一起工作:

glueContext.write_dynamic_frame.from_catalog(
frame=last_transform,
database=args["GLUE_DATABASE"],
table_name=args["GLUE_TABLE"],
transformation_ctx="datasink",
additional_options={"partitionKeys": partition_keys, "enableUpdateCatalog": True},
)

最新更新