我使用AWS Glue Studio以Parquet格式从S3 Bucket获取数据,连接数据并以JSON格式输出到另一个S3 Bucket。作业成功运行,但它在目标S3中创建了数十个空(零字节)文件。我没有手动更改代码,我只是使用了UI。
有趣的是UI正确地显示了结果(在Data预览选项卡中),问题似乎是上传到S3。
下面是Glue生成的Python代码:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "X-parquet-s3", table_name = "entity1", transformation_ctx = "DataSource0"]
## @return: DataSource0
## @inputs: []
DataSource0 = glueContext.create_dynamic_frame.from_catalog(database = "X-parquet-s3", table_name = "entity1", transformation_ctx = "DataSource0")
## @type: ApplyMapping
## @args: [mappings = [("archive_reason_id", "string", "entity1_archive_reason_id", "string"), ("archived_by_user_id", "string", "entity1_archived_by_user_id", "string"), ("owner_user_id", "string", "entity1_owner_user_id", "string"), ("archived_at", "timestamp", "entity1_archived_at", "timestamp"), ("origin", "string", "entity1_origin", "string"), ("last_story_at", "timestamp", "entity1_last_story_at", "timestamp"), ("created_at", "timestamp", "entity1_created_at", "timestamp"), ("posting_id", "string", "entity1_posting_id", "string"), ("origin_id", "string", "entity1_origin_id", "string"), ("is_confidential", "boolean", "entity1_is_confidential", "boolean"), ("contact_id", "string", "entity1_contact_id", "string"), ("last_advanced_at", "timestamp", "entity1_last_advanced_at", "timestamp"), ("stage_entered_at", "timestamp", "entity1_stage_entered_at", "timestamp"), ("entity1_id", "string", "entity1_entity1_id", "string"), ("account_id", "string", "entity1_account_id", "string"), ("requisition_id", "string", "entity1_requisition_id", "string"), ("is_deleted", "boolean", "entity1_is_deleted", "boolean"), ("sourced_by_user_id", "string", "entity1_sourced_by_user_id", "string"), ("is_hired", "boolean", "entity1_is_hired", "boolean"), ("row_updated_at", "timestamp", "entity1_row_updated_at", "timestamp"), ("account_stage_id", "string", "entity1_account_stage_id", "string")], transformation_ctx = "Transform1"]
## @return: Transform1
## @inputs: [frame = DataSource0]
Transform1 = ApplyMapping.apply(frame = DataSource0, mappings = [("archive_reason_id", "string", "entity1_archive_reason_id", "string"), ("archived_by_user_id", "string", "entity1_archived_by_user_id", "string"), ("owner_user_id", "string", "entity1_owner_user_id", "string"), ("archived_at", "timestamp", "entity1_archived_at", "timestamp"), ("origin", "string", "entity1_origin", "string"), ("last_story_at", "timestamp", "entity1_last_story_at", "timestamp"), ("created_at", "timestamp", "entity1_created_at", "timestamp"), ("posting_id", "string", "entity1_posting_id", "string"), ("origin_id", "string", "entity1_origin_id", "string"), ("is_confidential", "boolean", "entity1_is_confidential", "boolean"), ("contact_id", "string", "entity1_contact_id", "string"), ("last_advanced_at", "timestamp", "entity1_last_advanced_at", "timestamp"), ("stage_entered_at", "timestamp", "entity1_stage_entered_at", "timestamp"), ("entity1_id", "string", "entity1_entity1_id", "string"), ("account_id", "string", "entity1_account_id", "string"), ("requisition_id", "string", "entity1_requisition_id", "string"), ("is_deleted", "boolean", "entity1_is_deleted", "boolean"), ("sourced_by_user_id", "string", "entity1_sourced_by_user_id", "string"), ("is_hired", "boolean", "entity1_is_hired", "boolean"), ("row_updated_at", "timestamp", "entity1_row_updated_at", "timestamp"), ("account_stage_id", "string", "entity1_account_stage_id", "string")], transformation_ctx = "Transform1")
## @type: DataSource
## @args: [database = "X-parquet-s3", table_name = "contact_name", transformation_ctx = "DataSource1"]
## @return: DataSource1
## @inputs: []
DataSource1 = glueContext.create_dynamic_frame.from_catalog(database = "X-parquet-s3", table_name = "contact_name", transformation_ctx = "DataSource1")
## @type: ApplyMapping
## @args: [mappings = [("account_id", "string", "contact_name_account_id", "string"), ("row_updated_at", "timestamp", "contact_name_row_updated_at", "timestamp"), ("name", "string", "contact_name_name", "string"), ("contact_id", "string", "contact_name_contact_id", "string")], transformation_ctx = "Transform3"]
## @return: Transform3
## @inputs: [frame = DataSource1]
Transform3 = ApplyMapping.apply(frame = DataSource1, mappings = [("account_id", "string", "contact_name_account_id", "string"), ("row_updated_at", "timestamp", "contact_name_row_updated_at", "timestamp"), ("name", "string", "contact_name_name", "string"), ("contact_id", "string", "contact_name_contact_id", "string")], transformation_ctx = "Transform3")
## @type: Join
## @args: [keys2 = ["entity1_contact_id"], keys1 = ["contact_name_contact_id"], transformation_ctx = "Transform2"]
## @return: Transform2
## @inputs: [frame1 = Transform3, frame2 = Transform1]
Transform2 = Join.apply(frame1 = Transform3, frame2 = Transform1, keys2 = ["entity1_contact_id"], keys1 = ["contact_name_contact_id"], transformation_ctx = "Transform2")
## @type: SelectFields
## @args: [paths = ["contact_name_name", "entity1_entity1_id", "contact_name_contact_id"], transformation_ctx = "Transform0"]
## @return: Transform0
## @inputs: [frame = Transform2]
Transform0 = SelectFields.apply(frame = Transform2, paths = ["contact_name_name", "entity1_entity1_id", "contact_name_contact_id"], transformation_ctx = "Transform0")
## @type: DataSink
## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://X-sync-json/", "partitionKeys": []}, transformation_ctx = "DataSink0"]
## @return: DataSink0
## @inputs: [frame = Transform0]
DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://X-sync-json/", "partitionKeys": []}, transformation_ctx = "DataSink0")
job.commit()
我如何调试这个输出?在错误输出(在几十个日志之间)中,我能看到的唯一错误是这个,它没有告诉我太多:
2021-07-29 19:28:23,213 ERROR [Thread-7] util.UserData (UserData.java:getUserData(70)): Error encountered while try to get user data java.lang.NullPointerException at com.amazon.ws.emr.hadoop.fs.shaded.com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:871) at com.amazon.ws.emr.hadoop.fs.shaded.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2726) at com.amazon.ws.emr.hadoop.fs.util.UserData.getUserData(UserData.java:66) at com.amazon.ws.emr.hadoop.fs.util.UserData.<init>(UserData.java:39) at com.amazon.ws.emr.hadoop.fs.util.UserData.ofDefaultResourceLocations(UserData.java:52) at com.amazon.ws.emr.hadoop.fs.util.AWSSessionCredentialsProviderFactory.buildSTSClient(AWSSessionCredentialsProviderFactory.java:52) at com.amazon.ws.emr.hadoop.fs.util.AWSSessionCredentialsProviderFactory.<clinit>(AWSSessionCredentialsProviderFactory.java:17) at com.amazon.ws.emr.hadoop.fs.rolemapping.DefaultS3CredentialsResolver.resolve(DefaultS3CredentialsResolver.java:22) at com.amazon.ws.emr.hadoop.fs.guice.CredentialsProviderOverrider.override(CredentialsProviderOverrider.java:25) at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.executeOverriders(GlobalS3Executor.java:171) at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:103) at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:189) at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:184) at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.getObjectMetadata(AmazonS3LiteClient.java:96) at com.amazon.ws.emr.hadoop.fs.s3.lite.AbstractAmazonS3Lite.getObjectMetadata(AbstractAmazonS3Lite.java:43) at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:220) at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:860) at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1440) at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.exists(EmrFileSystem.java:352) at com.amazonaws.services.glue.util.FileSystemFolder.listFiles(FileLister.scala:227) at com.amazonaws.services.glue.hadoop.DefaultPartitionFilesLister$$anonfun$_partitions$1.apply(FileSystemBookmark.scala:83) at com.amazonaws.services.glue.hadoop.DefaultPartitionFilesLister$$anonfun$_partitions$1.apply(FileSystemBookmark.scala:81) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.immutable.List.flatMap(List.scala:355) at com.amazonaws.services.glue.hadoop.DefaultPartitionFilesLister._partitions(FileSystemBookmark.scala:81) at com.amazonaws.services.glue.hadoop.PartitionFilesListerUsingBookmark.initialFiltering$1(FileSystemBookmark.scala:337) at com.amazonaws.services.glue.hadoop.PartitionFilesListerUsingBookmark.getNextBookmark$1(FileSystemBookmark.scala:469) at com.amazonaws.services.glue.hadoop.PartitionFilesListerUsingBookmark.partitions(FileSystemBookmark.scala:569) at com.amazonaws.services.glue.SparkSQLDataSource$$anonfun$getDynamicFrame$9.apply(DataSource.scala:699) at com.amazonaws.services.glue.SparkSQLDataSource$$anonfun$getDynamicFrame$9.apply(DataSource.scala:677) at com.amazonaws.services.glue.util.FileSchemeWrapper$$anonfun$executeWithQualifiedScheme$1.apply(FileSchemeWrapper.scala:89) at com.amazonaws.services.glue.util.FileSchemeWrapper$$anonfun$executeWithQualifiedScheme$1.apply(FileSchemeWrapper.scala:89) at com.amazonaws.services.glue.util.FileSchemeWrapper.executeWith(FileSchemeWrapper.scala:82) at com.amazonaws.services.glue.util.FileSchemeWrapper.executeWithQualifiedScheme(FileSchemeWrapper.scala:89) at com.amazonaws.services.glue.SparkSQLDataSource.getDynamicFrame(DataSource.scala:676) at com.amazonaws.services.glue.DataSource$class.getDynamicFrame(DataSource.scala:94) at com.amazonaws.services.glue.SparkSQLDataSource.getDynamicFrame(DataSource.scala:658) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748)
2021-07-29 19:28:23,213 ERROR [Thread-7] util.UserData (UserData.java:getUserData(70)): Error encountered while try to get user data java.lang.NullPointerException at com.amazon.ws.emr.hadoop.fs.shaded.com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:871) at com.amazon.ws.emr.hadoop.fs.shaded.com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2726) at com.amazon.ws.emr.hadoop.fs.util.UserData.getUserData(UserData.java:66) at com.amazon.ws.emr.hadoop.fs.util.UserData.<init>(UserData.java:39) at com.amazon.ws.emr.hadoop.fs.util.UserData.ofDefaultResourceLocations(UserData.java:52) at com.amazon.ws.emr.hadoop.fs.util.AWSSessionCredentialsProviderFactory.buildSTSClient(AWSSessionCredentialsProviderFactory.java:52) at com.amazon.ws.emr.hadoop.fs.util.AWSSessionCredentialsProviderFactory.<clinit>(AWSSessionCredentialsProviderFactory.java:17) at com.amazon.ws.emr.hadoop.fs.rolemapping.DefaultS3CredentialsResolver.resolve(DefaultS3CredentialsResolver.java:22) at com.amazon.ws.emr.hadoop.fs.guice.CredentialsProviderOverrider.override(CredentialsProviderOverrider.java:25) at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.executeOverriders(GlobalS3Executor.java:171) at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:103) at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:189) at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:184) at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.getObjectMetadata(AmazonS3LiteClient.java:96) at com.amazon.ws.emr.hadoop.fs.s3.lite.AbstractAmazonS3Lite.getObjectMetadata(AbstractAmazonS3Lite.java:43) at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:220) at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:860) at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1440) at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.exists(EmrFileSystem.java:352) at com.amazonaws.services.glue.util.FileSystemFolder.listFiles(FileLister.scala:227) at com.amazonaws.services.glue.hadoop.DefaultPartitionFilesLister$$anonfun$_partitions$1.apply(FileSystemBookmark.scala:83) at com.amazonaws.services.glue.hadoop.DefaultPartitionFilesLister$$anonfun$_partitions$1.apply(FileSystemBookmark.scala:81) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.immutable.List.flatMap(List.scala:355) at com.amazonaws.services.glue.hadoop.DefaultPartitionFilesLister._partitions(FileSystemBookmark.scala:81) at com.amazonaws.services.glue.hadoop.PartitionFilesListerUsingBookmark.initialFiltering$1(FileSystemBookmark.scala:337) at com.amazonaws.services.glue.hadoop.PartitionFilesListerUsingBookmark.getNextBookmark$1(FileSystemBookmark.scala:469) at com.amazonaws.services.glue.hadoop.PartitionFilesListerUsingBookmark.partitions(FileSystemBookmark.scala:569) at com.amazonaws.services.glue.SparkSQLDataSource$$anonfun$getDynamicFrame$9.apply(DataSource.scala:699) at com.amazonaws.services.glue.SparkSQLDataSource$$anonfun$getDynamicFrame$9.apply(DataSource.scala:677) at com.amazonaws.services.glue.util.FileSchemeWrapper$$anonfun$executeWithQualifiedScheme$1.apply(FileSchemeWrapper.scala:89) at com.amazonaws.services.glue.util.FileSchemeWrapper$$anonfun$executeWithQualifiedScheme$1.apply(FileSchemeWrapper.scala:89) at com.amazonaws.services.glue.util.FileSchemeWrapper.executeWith(FileSchemeWrapper.scala:82) at com.amazonaws.services.glue.util.FileSchemeWrapper.executeWithQualifiedScheme(FileSchemeWrapper.scala:89) at com.amazonaws.services.glue.SparkSQLDataSource.getDynamicFrame(DataSource.scala:676) at com.amazonaws.services.glue.DataSource$class.getDynamicFrame(DataSource.scala:94) at com.amazonaws.services.glue.SparkSQLDataSource.getDynamicFrame(DataSource.scala:658) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748)
实际上不是一个错误,这是发生在每个胶水作业…Glue团队向我保证,您不必为此担心。
可以在写入前调用
重新划分DynamicFrame吗?repartitioned = Transform0.repartition(1)
您可以将1更改为最终所需的文件数。这可以解决您的问题。
请查看AWS Glue目录中的连接详细信息。我遇到过类似的问题,错误日志显示数据库连接问题。我更正了Glue Catalog中的数据库连接细节。我没有修改脚本。