我正在寻找如何在postgresql中将字符串写为jsonb类型的解决方案。因此DynamicFrame有一个字符串列,用于保存json数据。尝试保存到postgres 时
DataSink0 = glueContext.write_dynamic_frame.from_catalog(frame = Transform0, database = "cms", table_name = "cms_public_listings", transformation_ctx = "DataSink0")
我得到以下错误:
遇到错误:
An error occurred while calling o1623.pyWriteDynamicFrame.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 134.0 failed 4 times, most recent failure: Lost task 0.3 in stage 134.0 (TID 137, ip-172-31-27-18.ec2.internal, executor 24): java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "public".listings ([REMOVED_COLUMNS]) VALUES ([REMOVED_VALUES]) was aborted: ERROR: column "schema" is of type jsonb but expression is of type character varying
Hint: You will need to rewrite or cast the expression.
Position: 207 Call getNextException to see other errors in the batch.
我无法更改模式来保存字符串,所以要么我使用AWS Glue ETL,要么必须制作Python Shell作业。我更愿意找到一种将PySpark与AWS Glue一起使用的方法。
我更喜欢使用原生的spark数据帧,因为它允许我进行更多的自定义。我可以使用stringtype属性将json字段从数据帧转换为表中的jsonb字段。对于这种情况,我的数据帧有两个字段。
from pyspark import SparkConf
sc = SparkContext.getOrCreate(SparkConf())
spark = SparkSession(sc)
df = spark.read.format('csv')
.option('delimiter','|')
.option('header','True')
.load('your_path')
##some transformation...
url = 'jdbc:postgresql://your_host:5432/your_databasename'
properties = {'user':'*****',
'password':'*****',
'driver': "org.postgresql.Driver",
'stringtype':"unspecified"}
df.write.jdbc(url=url, table='your_tablename', mode='append', properties=properties)
在执行上述脚本之前,您应该在postgresql中创建表,因为属性mode设置为append。如下所示:
create table your_tablename
(
my_json_field jsonb,
another_field int
)