我在S3存储桶中有数据,其中包含许多看起来像这样的JSON文件:
s3://bucket1/news/year=2018/month=01/day=01/hour=xx/
day
分区包含多个hour=xx
分区,每天一个小时一个。我在day
分区中的文件上运行胶水ETL作业,并创建胶dynamic_frame_from_options
。然后,我使用ApplyMapping.apply
应用一些映射,该映射像魅力一样工作。
但是,我想根据每个文件的分区创建一个包含hour
值的新列。我可以使用Spark创建具有常数的新列,但是,如何使此列将分区用作源?
df1 = dynamicFrame.toDF().withColumn("update_date", lit("new column value"))
edit1
AWS的文章有关如何使用分区数据在创建DynamicFrame之前使用胶水爬车手,然后从胶水目录中创建dynamicFrame
。我需要直接从S3源创建dynamicFrame
。在此处输入链接描述
我并没有真正遵循您需要做的事情。如果您的文件在其上有partonton,或者只有在使用create_dynamic_frame .from_catalog
时,您是否已经有hour
值?您可以执行df1["hour"]
还是df1.select_fields["hour"]
?
如果您在ts(timestamp in yyyymmddhh format)
上有数据的数据,则不需要导入任何libs,您可以在Spark中使用Pure Python执行。
示例代码。首先,我创建一些将填充我的数据框的值。然后创建一个类似于下面的新变量。
df_values = [('2019010120',1),('2019010121',2),('2019010122',3),('2019010123',4)]
df = spark.createDataFrame(df_values,['yyyymmddhh','some_other_values'])
df_new = df.withColumn("hour", df["yyyymmddhh"][9:10])
df_new.show()
+----------+-----------------+----+
|yyyymmddhh|some_other_values|hour|
+----------+-----------------+----+
|2019010120| 1| 20|
|2019010121| 2| 21|
|2019010122| 3| 22|
|2019010123| 4| 23|
+----------+-----------------+----+
我对AWS胶水不熟悉,如果给定的链接不适用于您的情况,那么您可以尝试查看以下解决方案是否适合您:
使用input_file_name获取文件名,然后使用regexp_extract
从文件名中获取所需的分区列:
from pyspark.sql.functions import input_file_name, regexp_extract
df2 = df1.withColumn("hour", regexp_extract(input_file_name(), "hour=(.+?)/", 1))
,据我了解您的问题,您想在给定的一天中以分区为单位构建数据帧。通常,如果您使用Apache Hive式划分路径,并且您的文件具有相同的架构,则不应有问题
ds = glueContext.create_dynamic_frame.from_options(
's3',
{'paths': ['s3://bucket1/news/year=2018/month=01/day=01/']},
'json')
或...
df = spark.read.option("mergeSchema", "true").json('s3://bucket1/news/year=2018/month=01/day=01/')
因此,如果它不起作用,则应检查您是否使用Apache Hive式分区路径,并且文件具有相同的模式。
您也可以尝试在胶水中使用boto3框架(可能对您有用(:
import boto3
s3 = boto3.resource('s3')
有用的链接:
https://docs.aws.amazon.com/glue/latest/dg/aws-glue-programming-etl-partitions.html
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html
" ... AWS胶水不包括DynamicFrame中的分区列,仅包括数据。"
我们必须将S3键加载到新列中,并以编程方式对分区进行解码,以将我们想要的列创建到动态帧/数据框架中。创建后,我们可以根据需要使用它们。
ps:我已经针对镶木木材文件进行了测试。它不适用于JSON文件。
参考