我正在创建一个动态框架,create_dynamic_frame.from_options
直接从s3提取数据。
但是,我的原始数据上有多个分区,它们没有以这种方式显示在模式中,因为它们实际上不是数据的一部分,它们是s3文件夹结构的一部分。
如果分区是数据的一部分,我可以使用partitionKeys=["date"]
,但是date不是列,它是一个文件夹。
如何使用from_options
或其他机制检测这些分区?
我不能使用胶水爬行器来检测包括分区在内的完整模式,因为我的数据嵌套得太深,胶水爬行器无法处理。
处理分区的另一种方法是手动或编程地在Glue Data目录中注册它们。
您可以在lambda函数中使用glue客户端。
下面的示例来自一个lambda函数,该函数在文件降落到datalake时触发,它解析文件名以在glue编目中的目标表中创建分区。(你可以在我获得代码amazon-archives/athena-adobe-datafeed-splitter的aws档案中找到一个使用Adobe Analytics的用例)
import boto3
def create_glue_client():
"""Create a return a Glue client for the region this AWS lambda job is running in"""
current_region = os.environ['AWS_REGION'] # With Glue, we only support writing to the region where this code runs
return boto3.client('glue', region_name=current_region)
def does_partition_exist(glue_client, database, table, part_values):
"""Test if a specific partition exists in a database.table"""
try:
glue_client.get_partition(DatabaseName=database, TableName=table, PartitionValues=part_values)
return True
except glue_client.exceptions.EntityNotFoundException:
return False
def add_partition(glue_client, database_name, s3_report_base, key):
"""Add a partition to the target table for a specific date"""
# key_example = 01-xxxxxxxx_2019-11-07.tsv.gz
partition_date = key.split("_")[1].split(".")[0]
# partition_date = "2019-11-07"
year, month, day = partition_date.split('-')
# year, month, day = [2019,11,07]
part = key[:2]
# part = "01"
if does_partition_exist(glue_client, database_name, "target_table", [year, month, day, part]):
return
# get headers python list from csv file in S3
headers = get_headers(s3_headers_path)
# create new partition
glue_client.create_partition(
DatabaseName=database_name,
TableName="target_table",
PartitionInput={
"Values": [year, month, day, part],
"StorageDescriptor": storage_descriptor(
[{"Type": "string", "Name": name} for name in headers], #columns
'%s/year=%s/month=%s/day=%s/part=%s' % (s3_report_base, year, month, day, part) #location
),
"Parameters": {}
}
)
def storage_descriptor(columns, location):
"""Data Catalog storage descriptor with the desired columns and S3 location"""
return {
"Columns": columns,
"Location": location,
"InputFormat": "org.apache.hadoop.mapred.TextInputFormat",
"OutputFormat": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
"SerdeInfo": {
"SerializationLibrary": "org.apache.hadoop.hive.serde2.OpenCSVSerde",
"Parameters": {
"separatorChar": "t"
}
},
"BucketColumns": [], # Required or SHOW CREATE TABLE fails
"Parameters": {} # Required or create_dynamic_frame.from_catalog fails
}
对于现有的文件结构,您可以使用下面的函数来迭代原始桶中的文件键并解析它们以创建分区:
s3 = boto3.resource('s3')
def get_s3_keys(bucket):
"""Get a list of keys in an S3 bucket."""
keys = []
resp = s3.list_objects_v2(Bucket=bucket)
for obj in resp['Contents']:
keys.append(obj['Key'])
return keys
然后你可以按如下方式创建动态框架:
datasource = glueContext.create_dynamic_frame.from_catalog(
database = "database_name",
table_name = "target_table",
transformation_ctx = "datasource1",
push_down_predicate = "(year='2020' and month='5' and day='4')"
)
另一个解决方法是使用attachFilename
选项(类似于spark中的input_file_name
)作为列显示数据的文件路径,并从字符串路径手动解析分区。
df = glueContext.create_dynamic_frame.from_options(
connection_type='s3',
connection_options = {"paths": paths, "groupFiles": "none"},
format="csv",
format_options = {"withHeader": True,
"multiLine": True,
"quoteChar": '"',
"attachFilename": "source_file"
},
transformation_ctx="table_source_s3"
)