通过 Athena 和/或 Glue 将 S3 数据 ETL 转换为 CSV



我有一个装满图像文件的 S3 存储桶 (com.example.myorg.images(,它们都遵循相同的命名约定:

<PRODUCT_ID>_<NUMBER>.jpg

其中<PRODUCT_ID>是长数字(RDS 表中的主键(,<NUMBER>始终是以下三个值之一:100、200 或 300。例如,存储桶可能包含:

  • 1394203949_100.jpg
  • 1394203949_200.jpg
  • 1394203949_300.jpg
  • 1394203950_100.jpg
  • 1394203950_200.jpg
  • 1394203950_300.jpg
  • 。等。

我想编写一个 Athena 或 Glue ETL 过程,该进程在 S3 存储桶中查询其中的所有图像,并以某种方式将UNIQUE<PRODUCT_ID>值提取到表或列表中。

据我了解,Athena 会将此表/列表备份为可下载的 CSV;如果为 true,那么我将按照我在命令行上需要的方式单独处理无标头 CSV。

例如,如果上面的 6 张图像是存储桶中唯一的图像,那么此过程将:

  1. 查询 S3 并获取包含13942039491394203950的表/列表
  2. 创建一个可下载的 CSV,如下所示:

可能是 S3 上的文件,甚至是内存中的文件:

1394203949,1394203950

由于之前没有使用雅典娜或胶水的经验,我正在尝试使用 Athena 查询来实现这一点,但我很难通过树木看到森林。

我在第一部分(S3 查询(的最佳尝试:

CREATE EXTERNAL TABLE IF NOT EXISTS products_with_thumbnails (
product_id string
) 
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
ESCAPED BY '\'
LINES TERMINATED BY 'n'
LOCATION 's3://com.example.myorg.images/';

我相信这会使用 S3 存储桶中所有内容的文件名设置我的内存表,但是:

  • 如何使此表仅包含唯一的产品 ID(无重复(?
  • 如何仅提取文件名的<PRODUCT_ID>段(1394203949而不是1394203949_100.jpg(?

我不偏爱雅典娜或Glue,并且对任何满足我需求的解决方案感到满意。最坏情况我可以编写一个在应用程序层完成所有这些 ETL 的 Lambda,但如果存在类似 Hive 或面向 ETL 的 AWS 服务来执行此类操作,我宁愿利用它!

提前感谢!

Athena 查询文件内部,而不是文件列表,因此仅使用 Athena 是行不通的(有一些方法可以滥用它来实现它,但它们会既昂贵又缓慢,不是您想要的(。

如果图像数量少于十万张左右,我认为您最好的选择是编写一个或多或少相当于aws s3 ls --recursive s3://some-bucket/ | perl -ne '/(d+)_d+.jpg$/ && print "$1n"' | uniq的脚本。

如果不止于此,我建议使用 S3 库存,也许还有雅典娜进行处理。您可以在此处找到有关如何启用 S3 清单并使用 Athena 查询清单的说明:https://docs.aws.amazon.com/AmazonS3/latest/dev/storage-inventory.html

设置 S3 清单后,您的查询可能如下所示:

SELECT DISTINCT regexp_extract(key, '(d+)_d+.jpg', 1)
FROM the_inventory_table_name

不过,编写处理清单的脚本可能比设置 Athena 表要少。不过,我真的建议使用 S3 清单,而不是在有很多对象时直接列出 S3。

看起来您可以在 S3 中创建按日期分区的 S3 清单的分区文件:

CREATE EXTERNAL TABLE my_inventory(
`bucket` string,
key string,
version_id string,
is_latest boolean,
is_delete_marker boolean,
size bigint,
last_modified_date timestamp,
e_tag string,
storage_class string,
is_multipart_uploaded boolean,
replication_status string,
encryption_status string,
object_lock_retain_until_date timestamp,
object_lock_mode string,
object_lock_legal_hold_status string
)
PARTITIONED BY (dt string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT  'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
LOCATION 's3://com.example.myorg.mybucket/com.example.myorg.mybucket/com.example.myorg.mybucket-ORC/hive/';

然后,每当要查询该my_inventory表时,请首先通过为当前日期创建新分区来修复分区文件:

MSCK REPAIR TABLE my_inventory;

最后,您可以通过PrestoDB的类似SQL的语法进行查询:

SELECT key FROM my_inventory WHERE dt <= '<YYYY-MM-DD>-00-00';

其中<YYYY-MM-DD>YYYY-MM-DD格式的当前日期。

然后,您可以将查询结果下载为 CSV 文件,并根据需要对其进行处理。

最新更新