如何使用pyspark将json对象插入到postgres表中的列中



我有一个json变量,如下所示,我需要将其插入postgres中表中的特定列中。我该怎么办

JSON变量是man_j,它低于格式

{"a_type":"Res", "display_type":"Res", "data_type":"AAA", 
"source_name":"na", "li_details":{"li_value":"na", "li_column":"na"}, 
"additional_info": {"d_name":"na",  
"description":"na", "program":"program2",  
"Author":"author2", "email":"na", "sum":"na", 
"file_name":"na","additional_files":"na", "notify_email":"na"}}

我在postgres表中有一个名为man_de的列,我需要为它插入值

Spark不直接提供单列更新。使用暂存区(在数据库级别(和单独更新json列(这比实际问题要复杂一些(,您的解决方案可能会变得复杂。

由于您也拥有其他列的数据,请将JSON变量转换为DF。将其与其他列的DF合并。现在,您可以直接附加一些内容。

  1. DF1->所有其他列
  2. DF2->JSON变量

加入DF1和DF2(希望你有一些PK要加入(按照RDBMS表中的顺序重新排列列。根据您的需要使用df.Write编写/附加

我不确定这是否是你想要的,假设你的连接是用PostGre配置的,那么你可以简单地调用下面的命令来执行任何sql操作

df = df.withColumn("new_json_column", json_variable) // I am assuming here you have the correct schema or else you can create null column to ensure there is no schema mismatch
//to send only 1 transaction
df = df.show(1)

df.write.mode("append").jdbc(jdbcUrl, "db.table_name", connectionProperties)

最新更新