TL;DR
- 我正在尝试使用Glue[Studio]作业将许多S3数据文件合并为更少的数量
- 输入数据在Glue中编目,并可通过Athena查询
- Glue Job以";"成功";输出状态,但未创建输出文件
详细信息
输入我有数据是以每分钟一次的周期从刮刀中创建的。它将JSON(gzip(格式的输出转储到一个bucket中。我在Glue中对这个桶进行了编目,可以使用Athena查询它,不会有任何错误。这让我更加自信,我已经正确地设置了目录和数据结构。单独来看,这并不理想,因为它每天创建约1.4K个文件,这使得对数据的查询(通过Athena(非常缓慢,因为它们必须扫描太多、太小的文件
目标我想定期(可能是每周、每月一次,我还不确定(将每分钟一次的文件合并为更少的文件,这样查询就可以扫描更大、更少的文件(更快的查询(。
方法我的计划是创建一个Glue ETL作业(使用Glue Studio(来读取目录表,并写入新的S3位置(保持相同的JSON gzip格式,这样我就可以用合并的文件将Glue表重新指向新的S3位置(。我使用Glue Studio设置了作业,当我运行它时,它说成功了,但没有输出到指定的S3位置(不是空文件,只是什么都没有(。
卡住了我有点不知所措,因为(1(它说它成功了,(2(我甚至没有修改剧本(见下文(,所以我认为(也许是个坏主意(不是这样。
日志我试过浏览CloudWatch日志,看看它是否会有所帮助,但我没有从中得到太多。我怀疑这可能与这个条目有关,但我找不到方法来确认这一点或将任何内容更改为";"修复";它。(路径肯定存在,我可以在S3中看到它,目录可以通过Athena查询进行搜索,它是由Glue Studio脚本生成器自动生成的。(对我来说,这听起来像是我在某个地方选择了一个选项,让它认为我只想要某种";增量";扫描数据。但我没有(故意(,也找不到任何能让我看起来有的地方。
CloudWatch日志条目
21/03/13 17:59:39 WARN HadoopDataSource: Skipping Partition {} as no new files detected @ s3://my_bucket/my_folder/my_source_data/ or path does not exist
粘贴脚本
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "my_database", table_name = "my_table", transformation_ctx = "DataSource0"]
## @return: DataSource0
## @inputs: []
DataSource0 = glueContext.create_dynamic_frame.from_catalog(database = "my_database", table_name = "my_table", transformation_ctx = "DataSource0")
## @type: DataSink
## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://my_bucket/my_folder/consolidation/", "compression": "gzip", "partitionKeys": []}, transformation_ctx = "DataSink0"]
## @return: DataSink0
## @inputs: [frame = DataSource0]
DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "json", connection_options = {"path": "s3://my_bucket/my_folder/consolidation/", "compression": "gzip", "partitionKeys": []}, transformation_ctx = "DataSink0")
job.commit()
我首先研究的其他帖子
没有一个具有相同的问题;"成功";没有输出的作业。然而,其中一个创建了空文件,而另一个创建的文件太多。最有趣的方法是使用Athena为您创建新的输出文件(使用外部表(;然而,当我研究这一点时,输出格式选项似乎不会有JSON-zip(或没有gzip的JSON(,而是只有CSV和Parquet,这对我来说是不可取的。
如何使用AWS Glue 将许多CSV文件转换为Parquet
AWS Glue:ETL作业创建了许多空的输出文件
AWS胶粘作业-写入单个Parquet文件
AWS Glue,输出一个带有分区的文件
datasource_df=DataSource0.repartition(1(
DataLink0=glueContext.write_dynamic_frame.from_options
job.commit((