正如标题所述,我在将动态框架上的列从Epoch转换为时间戳时遇到了麻烦。
我已经尝试将其转换为数据帧并返回到动态帧,但它不工作。
这是我正在处理的:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql import functions as f
from pyspark.sql import types as t
from pyspark.sql.functions import udf
from awsglue.job import Job
## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "s3-sat-dth-prd", table_name = "s3_sat_dth_prd_vehicle", transformation_ctx = "datasource0")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("in", "int", "in", "int"), ("out", "int", "out", "int"), ("ts", "long", "ts", "long"), ("cam", "string", "cam", "string"), ("subclass", "string", "subclass", "string")], transformation_ctx = "applymapping1")
selectfields2 = SelectFields.apply(frame = applymapping1, paths = ["in", "out", "ts", "cam", "subclass"], transformation_ctx = "selectfields2")
resolvechoice3 = ResolveChoice.apply(frame = selectfields2, choice = "MATCH_CATALOG", database = "s3-sat-dth-prd", table_name = "test_split_array_into_records_json", transformation_ctx = "resolvechoice3")
datasink4 = glueContext.write_dynamic_frame.from_catalog(frame = resolvechoice3, database = "s3-sat-dth-prd", table_name = "test_split_array_into_records_json", transformation_ctx = "datasink4")
job.commit()
我尝试创建一个数据帧tsconvert = resolvechoice3.toDF()
,并把它变成动态帧resolvechoice4 = DynamicFrame.fromDF(tsconvert, GlueContext, resolvechoice4)
;我得到一个语法错误的最后一个代码片段,我粘贴在resolvechoice4
的末尾。
找不到Glue中是否有内置的东西可以转换为时间戳。当Iìll确保数据正确写入S3时,Redshift将成为我的目标。
有没有人做过这样的事,可以给我引路?
提前感谢。
AWS Glue具有SQL函数(通过pyspark包导入),允许将epoch时间戳转换为人类可读或所需的日期格式。
的例子:
from pyspark.sql.functions import from_unixtime, unix_timestamp, col
resolvechoice3 = ResolveChoice.apply(frame = selectfields2, choice = "MATCH_CATALOG", database = "s3-sat-dth-prd", table_name = "test_split_array_into_records_json", transformation_ctx = "resolvechoice3")
tsconvert = resolvechoice3.toDF()
tsconverted= tsconvert.withColumn(col(tsColumnName),from_unixtime(col(tsColumnName)))
resolvechoice4 = DynamicFrame.fromDF(tsconverted, glue_context,"transformedDF")
根据您的需要,您可以使用pyspark.sql.functions类中的日期函数以类似的方式定义日期格式。