使用AWS Glue Job动态读取所有数据目录表



所以我设法创建了一个AWS Glue Crawler,它可以爬取我所有的表并将它们存储在一个数据目录表中。我的数据库大约有25个表,我可以看到它们。

我还设法创建了一个Glue作业,该作业将一个表复制到一个桶中,并将其保存为.csv文件。它工作得很好,看起来像这样:

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# Script generated for node Postgres
Postgres_node1 = glueContext.create_dynamic_frame.from_catalog(
database="glue_database",
table_name="sensors",
transformation_ctx="Postgres_node1",
)
Postgres_node1 = Postgres_node1.repartition(1)
# Script generated for node ApplyMapping
ApplyMapping_node2 = ApplyMapping.apply(
frame=Postgres_node1,
mappings=[
("placeholder_data", "int", "placeholder_data", "int"),
("end_at", "timestamp", "end_at", "timestamp"),
("deleted", "boolean", "deleted", "boolean"),
("placeholder_data", "boolean", "placeholder_data", "boolean"),
("placeholder_data", "int", "placeholder_data", "int"),
("start_at", "timestamp", "placeholder_data", "timestamp"),
("placeholder_data", "int", "placeholder_data", "int"),
],
transformation_ctx="ApplyMapping_node2",
)
# Get the current date as a string
current_date = date.today().strftime("%Y-%m-%d")
# Script generated for node S3 bucket
S3bucket_node3 = glueContext.write_dynamic_frame.from_options(
frame=ApplyMapping_node2,
connection_type="s3",
format="csv",
connection_options={
"path": f"s3://bucketname/postgres/{current_date}",  
"mode": "append", 
"partitionKeys": [],
},
transformation_ctx="S3bucket_node3",
)
job.commit()

但这只是一个表,在胶水可视化编辑器中,我似乎找不到一个"所有表"选项。是否有一种方法可以动态读取所有表并执行上面生成的代码,以便为我生成25 .csv ?当然,我需要编辑映射,但我假设我也可以从表中获取映射。但是我目前一直在尝试导入数据目录并从中读取所有表。

我的最终目标是顺便用Quicksight和Athena查询和可视化数据。

提前感谢!

您可以使用Glue脚本中的boto3客户端来完成此操作。使用visual studio不适合这个用例。因此,您需要将您的工作转换为基于脚本的工作。

下面的代码循环遍历glue目录中的表,然后为每个表编写一个DataFrame。

import boto3
client = boto3.client('glue')
def get_glue_tables(database=None):
next_token = ''
tables = []

while True:
response = client.get_tables(
DatabaseName=database,
NextToken=next_token
)

for table in response.get('TableList'):
tables.append(table.get('Name'))

next_token = response.get('NextToken')

if next_token is None:
return tables

通过这样做,您需要将映射应用到每个表。因此,如果需要对每个表执行不同的映射,那么为每个表设置一个作业会更有意义。但是,如果这不是问题,您可以使用这个函数来循环并将每一个写入一个DataFrame,如下所示:

for table in get_glue_tables(db_name):
DataCatalogtable_node1 = glueContext.create_dynamic_frame.from_catalog(
database=db_name,
table_name=table,
transformation_ctx="DataCatalogtable_node1",
)

s3_path = f"s3://bucketname/postgres/{current_date}",
S3bucket_node3 = glueContext.write_dynamic_frame.from_options(
frame=DataCatalogtable_node1,
connection_type="s3",
format="csv",
connection_options={
"path": f"s3://bucketname/postgres/{current_date}",  
"mode": "append", 
"partitionKeys": [],
},
transformation_ctx="S3bucket_node3",
)