AWS Glue transform



试图从s3桶中读取Input.csv文件,获得不同的值(并做一些其他转换),然后写入target.csv文件,但在试图将数据写入s3桶中的target.csv时遇到问题。

下面是代码:

import sys
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from awsglue.job import Job
glueContext = GlueContext(SparkContext.getOrCreate())
dfnew = glueContext.create_dynamic_frame_from_options("s3", {'paths': ["s3://bucket_name/Input.csv"] }, format="csv" )
dfMod = dfnew.select_fields(["Col2","Col3"]).toDF().distinct()
dnFrame  = DynamicFrame.fromDF(dfMod, glueContext, "test_nest")
datasink = glueContext.write_dynamic_frame.from_options(frame = dnFrame, connection_type = "s3",connection_options = {"path": "s3://bucket_name/Target.csv"}, format = "csv", transformation_ctx ="datasink") 

这是Input.csv中的数据:

Col1    Col2    Col3
1       1       -30.4
2       2       -30.5
3       3        6.70
4       4        5.89
5       4        6.89
6       4        6.70
7       4        5.89
8       4        5.89

错误:

val dfmod = dfnew.select_fields(["Col2","Col3"]).toDF().distinct().show() ^ SyntaxError: invalid syntax During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/opt/amazon/bin/runscript.py", line 92, in <module>
while "runpy.py" in new_stack.tb_frame.f_code.co_filename: AttributeError: 'NoneType' object has no attribute 'tb_frame'

我确实理解是因为我使用create_dynamic_frame_from_options而不是from_catalog但是我如何获得所需的功能,而使用from_options(因为我的格式是csv在s3) ?

IAM (Glue服务策略):

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject"
],
"Resource": [
"arn:aws:s3:::bucket_Name/Output/**/**/*"
]
}
]
}

S3桶策略:

{
"Version": "2012-10-17",
"Id": "Policy***",
"Statement": [
{
"Sid": "Stmt1***",
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::account_number:root"
},
"Action": "s3:*",
"Resource": "arn:aws:s3:::bucket_name"
}
]
}

请帮助

语法错误

val dfMod = dfnew.select_fields(["Col2","Col3"]).toDF().distinct().show()

可以更正如下,我们不需要valshow(),它将简单地返回一个数据帧,我们在传递给write_dynamic_frame之前将其转换为DynamicFrame,还需要在from awsglue.dynamicframe import DynamicFrame顶部输入语句

dfMod = dfnew.select_fields("Col2","Col3").toDF().distinct()
dnFrame  = DynamicFrame.fromDF(dfMod, glueContext, "test_nest")

相关内容

  • 没有找到相关文章

最新更新