我创建了一个 aws Glue Crawler 和作业。目的是将数据从 postgres RDS 数据库表传输到 S3 中的一个.csv文件。一切正常,但我在 S19 中总共获得了 3 个文件。每个文件都是空的,除了三个文件,其中有一行数据库表以及标题。因此,数据库的每一行都写入单独的.csv文件。我可以在这里做什么来指定我只需要一个文件,其中第一行是标题,然后是数据库的每一行?
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "gluedatabse", table_name = "postgresgluetest_public_account", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "gluedatabse", table_name = "postgresgluetest_public_account", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("password", "string", "password", "string"), ("user_id", "string", "user_id", "string"), ("username", "string", "username", "string")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("user_id", "string", "user_id", "string"), ("username", "string", "username", "string"),("password", "string", "password", "string")], transformation_ctx = "applymapping1")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://BUCKETNAMENOTSHOWN"}, format = "csv", transformation_ctx = "datasink2"]
## @return: datasink2
## @inputs: [frame = applymapping1]
datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://BUCKETNAMENOTSHOWN"}, format = "csv", transformation_ctx = "datasink2")
job.commit()
数据库如下所示:数据图片
在 S3 中看起来像这样:S3 存储桶
S3 中的一个示例.csv看起来是这样的:
password,user_id,username
346sdfghj45g,user3,dieter
正如我所说,每个表行都有一个文件。
编辑:分段上传到 s3 似乎无法正常工作。它只是更新零件,但在完成后不会将它们合并在一起。以下是作业日志的最后一行:以下是日志的最后一行:
19/04/04 13:26:41 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 1 blocks
19/04/04 13:26:41 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms
19/04/04 13:26:41 INFO Executor: Finished task 16.0 in stage 2.0 (TID 18). 2346 bytes result sent to driver
19/04/04 13:26:41 INFO MultipartUploadOutputStream: close closed:false s3://bucketname/run-1554384396528-part-r-00018
19/04/04 13:26:41 INFO MultipartUploadOutputStream: close closed:true s3://bucketname/run-1554384396528-part-r-00017
19/04/04 13:26:41 INFO MultipartUploadOutputStream: close closed:false s3://bucketname/run-1554384396528-part-r-00019
19/04/04 13:26:41 INFO Executor: Finished task 17.0 in stage 2.0 (TID 19). 2346 bytes result sent to driver
19/04/04 13:26:41 INFO MultipartUploadOutputStream: close closed:true s3://bucketname/run-1554384396528-part-r-00018
19/04/04 13:26:41 INFO Executor: Finished task 18.0 in stage 2.0 (TID 20). 2346 bytes result sent to driver
19/04/04 13:26:41 INFO MultipartUploadOutputStream: close closed:true s3://bucketname/run-1554384396528-part-r-00019
19/04/04 13:26:41 INFO Executor: Finished task 19.0 in stage 2.0 (TID 21). 2346 bytes result sent to driver
19/04/04 13:26:41 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown
19/04/04 13:26:41 INFO CoarseGrainedExecutorBackend: Driver from 172.31.20.76:39779 disconnected during shutdown
19/04/04 13:26:41 INFO CoarseGrainedExecutorBackend: Driver from 172.31.20.76:39779 disconnected during shutdown
19/04/04 13:26:41 INFO MemoryStore: MemoryStore cleared
19/04/04 13:26:41 INFO BlockManager: BlockManager stopped
19/04/04 13:26:41 INFO ShutdownHookManager: Shutdown hook called
End of LogType:stderr
你能试试以下方法吗?
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "gluedatabse", table_name = "postgresgluetest_public_account", transformation_ctx = "datasource0")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("user_id", "string", "user_id", "string"), ("username", "string", "username", "string"),("password", "string", "password", "string")], transformation_ctx = "applymapping1")
## Force one partition, so it can save only 1 file instead of 19
repartition = applymapping1.repartition(1)
datasink2 = glueContext.write_dynamic_frame.from_options(frame = repartition, connection_type = "s3", connection_options = {"path": "s3://BUCKETNAMENOTSHOWN"}, format = "csv", transformation_ctx = "datasink2")
job.commit()
另外,如果您想检查当前有多少个分区,可以尝试以下代码。我猜有 19 个,这就是为什么将 19 个文件保存回 s3 的原因:
## Change to Pyspark Dataframe
dataframe = DynamicFrame.toDF(applymapping1)
## Print number of partitions
print(dataframe.rdd.getNumPartitions())
## Change back to DynamicFrame
datasink2 = DynamicFrame.fromDF(dataframe, glueContext, "datasink2")
对于那些正在寻找尝试维护可视化编辑器的解决方案的人,只需使用以下代码使用自定义转换器:
def MyTransform (glueContext, dfc) -> DynamicFrameCollection:
df = dfc.select(list(dfc.keys())[0]).toDF()
df_reparted = df.repartition(1)
dyf_reparted = DynamicFrame.fromDF(df_reparted, glueContext, "repart")
return(DynamicFrameCollection({"CustomTransform0": dyf_reparted}, glueContext))
这将输出一个 DF 元素的集合,您可以将其与"从集合中选择"链接
如果这些不是大数据集,那么您可以轻松考虑将胶水动态帧(glue_dyf(转换为火花df(spark_df(,然后将火花df转换为熊猫df(pandas_df(,如下所示:
spark_df = DynamicFrame.toDF(glue_dyf)
pandas_df = spark_df.toPandas()
pandas_df.to_csv("s3://BUCKETNAME/subfolder/FileName.csv",index=False)
在此方法中,您不必担心对少量数据进行重新分区。建议通过利用胶水工作器,火花分区来像以前的答案一样对待大型数据集。