我正在尝试使用merchant_id、年份和月份执行分区,因为您可以在数据链接中进行检查。
只涉及merchant_id的分区进程运行良好。。我的数据源中已经有了该列。
但是我没有年份和月份。因此,我尝试获取created_at,将其拆分,并在同一个表中添加"year"one_answers"month"列。因此,通过这种方式,我可以执行分区(merchant_id,year,month(。
有人能帮我吗?这是Glue中的代码:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import *
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
DataSource0 = glueContext.create_dynamic_frame.from_catalog(database = "recurrent", table_name = "bills", transformation_ctx = "DataSource0")
df0 = DataSource0.toDF()
dataframe1 = df0.withColumn("filedate", df0.created_at)
dynamicframe2 = DynamicFrame.fromDF(dataframe1, glueContext, "dynamicframe2")
def map_function(dynamicRecord):
date = dynamicRecord["filedate"].split("-")[0][-8:]
dynamicRecord["year"] = date[0:4]
dynamicRecord["month"] = date[4:6]
dynamicRecord["day"]= date[6:8]
return dynamicRecord
mapping3 = Map.apply(frame = dynamicframe2, f = map_function, transformation_ctx = "mapping3")
Transform2 = ApplyMapping.apply(frame = mapping3, mappings = [("op", "string", "bills_op", "string"), ("timestamp", "string", "bills_timestamp", "string"), ("id", "int", "bills_id", "int"), ("subscription_id", "int", "bills_subscription_id", "int"), ("customer_id", "int", "bills_customer_id", "int"), ("amount", "decimal", "bills_amount", "decimal"), ("created_at", "timestamp", "bills_created_at", "timestamp"), ("updated_at", "timestamp", "bills_updated_at", "timestamp"), ("status", "int", "bills_status", "int"), ("payment_method_id", "int", "bills_payment_method_id", "int"), ("due_at", "timestamp", "bills_due_at", "timestamp"), ("billing_at", "timestamp", "bills_billing_at", "timestamp"), ("installments", "int", "bills_installments", "int"), ("merchant_id", "int", "bills_merchant_id", "int"), ("year", "string", "year", "string"), ("month", "string", "month", "string"), ("day", "string", "day", "string")], transformation_ctx = "Transform2")
DataSource1 = glueContext.create_dynamic_frame.from_catalog(database = "recurrent", table_name = "clientes_ativos_enterprise", transformation_ctx = "DataSource1")
Transform0 = ApplyMapping.apply(frame = DataSource1, mappings = [("meta_id", "int", "filter_meta_id", "int"), ("meta_value", "string", "filter_meta_value", "string"), ("merc_id", "int", "filter_merc_id", "int")], transformation_ctx = "Transform0")
Transform1 = Join.apply(frame1 = Transform0, frame2 = Transform2, keys2 = ["bills_merchant_id"], keys1 = ["filter_merc_id"], transformation_ctx = "Transform1")
DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform1, connection_type = "s3", format = "parquet", connection_options = {"path": "s3://analytics-plataforma-datalake/transformation-zone/partition_bills/", "compression": "gzip", "partitionKeys": ["bills_merchant_id","year","month"]}, transformation_ctx = "DataSink0")
job.commit()
这是完整的消息错误:
Traceback (most recent call last):
File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o151.pyWriteDynamicFrame.
: org.apache.spark.sql.AnalysisException:
Datasource does not support writing empty or nested empty schemas.
Please make sure the data schema has at least one or more column(s).
;
at org.apache.spark.sql.execution.datasources.DataSource$.org$apache$spark$sql$execution$datasources$DataSource$$validateSchema(DataSource.scala:733)
at org.apache.spark.sql.execution.datasources.DataSource.planForWriting(DataSource.scala:523)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:290)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at com.amazonaws.services.glue.SparkSQLDataSink$$anonfun$writeDynamicFrame$1.apply(DataSink.scala:535)
at com.amazonaws.services.glue.SparkSQLDataSink$$anonfun$writeDynamicFrame$1.apply(DataSink.scala:522)
at com.amazonaws.services.glue.util.FileSchemeWrapper$$anonfun$executeWithQualifiedScheme$1.apply(FileSchemeWrapper.scala:66)
at com.amazonaws.services.glue.util.FileSchemeWrapper$$anonfun$executeWithQualifiedScheme$1.apply(FileSchemeWrapper.scala:66)
at com.amazonaws.services.glue.util.FileSchemeWrapper.executeWith(FileSchemeWrapper.scala:58)
at com.amazonaws.services.glue.util.FileSchemeWrapper.executeWithQualifiedScheme(FileSchemeWrapper.scala:66)
at com.amazonaws.services.glue.SparkSQLDataSink.writeDynamicFrame(DataSink.scala:521)
at com.amazonaws.services.glue.DataSink.pyWriteDynamicFrame(DataSink.scala:63)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/tmp/bills_partition_filtered.py", line 71, in <module>
DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform1, connection_type = "s3", format = "parquet", connection_options =
{
"path": "s3://analytics-plataforma-datalake/transformation-zone/partition_bills/",
"compression": "gzip",
"partitionKeys": [
"bills_merchant_id",
"year",
"month"
]
}
, transformation_ctx = "DataSink0")
File "/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py", line 640, in from_options
format_options, transformation_ctx)
File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 242, in write_dynamic_frame_from_options
format, format_options, transformation_ctx)
File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 265, in write_from_options
return sink.write(frame_or_dfc)
File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 35, in write
return self.writeFrame(dynamic_frame_or_dfc, info)
File "/opt/amazon/lib/python3.6/site-packages/awsglue/data_sink.py", line 31, in writeFrame
return DynamicFrame(self._jsink.pyWriteDynamicFrame(dynamic_frame._jdf, callsite(), info), dynamic_frame.glue_ctx, dynamic_frame.name + "_errors")
File "/opt/amazon/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/opt/amazon/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: 'nDatasource does not support writing empty or nested empty schemas.nPlease make sure the data schema has at least one or more column(s).n
谢谢大家!!
-
错误跟踪表明数据帧模式存在问题。在写作之前,你应该先看看df.printSchema,以了解你的模式是正确的。
-
您应该将created_at列强制转换为date/datetime。
-
使用withColumn函数并解析created_at列的年和月,而不是静态获取值,这可能会在将来导致不一致。
dynamicRecord["年份"]=日期[0:4]
这不是解析日期的好方法。
按照此处的答案申请#3:将日期拆分为年、月和日,分隔符不一致