pyspark中的动态字典



我正在尝试使用pyspark动态构建一个字典,通过读取oracle数据库上的表结构。这是我的代码的简化版本

预定义字典(convert_dict.py)

conversions = {
"COL1": lambda c: f.col(c).cast("string"),
"COL2": lambda c: f.from_unixtime(f.unix_timestamp(c, dateFormat)).cast("date"),
"COL3": lambda c: f.from_unixtime(f.unix_timestamp(c, dateFormat)).cast("date"),
"COL4": lambda c: f.col(c).cast("float")
}

主程序

from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.types import StructType, StructField, StringType
from convert_dict import conversions

spark = SparkSession.builder.appName("file_testing").getOrCreate()
table_name = "TEST_TABLE"
input_file_path = "file:\c:Desktopfoo.txt"
sql_query = "(select listagg(column_name,',') within group(order by column_id) col from user_tab_columns where " 
"table_name = '" + table_name + "' and column_name not in ('COL10', 'COL11','COL12') order by column_id) table_columns"

struct_schema = StructType([
StructField("COL1", StringType(), True),
StructField("COL2", StringType(), True),
StructField("COL3", StringType(), True),
StructField("COL4", StringType(), True),
])

data_df = spark.read.schema(struct_schema).option("sep", ",").option("header", "true").csv(input_file_path)
validdateData = lines.withColumn(
"dataTypeValidations",
f.concat_ws(",",
*[
f.when(
v(k).isNull() & f.col(k).isNotNull(),
f.lit(k +  " not valid") 
).otherwise(f.lit("None"))

for k,v in conversions.items()
]
)
)

data_temp = validdateData
for k,v in conversions.items():
data_temp = data_temp.withColumn(k,v(k))
validateData.show()
spark.stop()

如果我要改变上面的代码来动态地从数据库生成字典

DATEFORMAT = "yyyyMMdd"
dict_sql = """
(select column_name,case when data_type = 'VARCHAR2' then 'string' when data_type in ( 'DATE','TIMESTAMP(6)') then 'date' when data_type = 'NUMBER' and NVL(DATA_SCALE,0) <> 0 then 'float' when data_type = 'NUMBER' and NVL(DATA_SCALE,0) = 0 then 'int'
end d_type from user_tab_columns where table_name = 'TEST_TABLE' and column_name not in ('COL10', 'COL11','COL12')) dict
"""
column_df = spark.read.format("jdbc").option("url",url).option("dbtable", dict_sql)
.option("user",user).option("password",password).option("driver",driver).load()

conversions = {}
for row in column_df.rdd.collect():
column_name = row.COLUMN_NAME
column_type = row.D_TYPE
if column_type == "date":
conversions.update({column_name: lambda c:f.col(c)})
elif column_type == "float":
conversions.update({column_name: lambda c: f.col(c).cast("float")})
elif column_type == "date":
conversions.update({column_name: lambda c: f.from_unixtime(f.unix_timestamp(c, DATEFORMAT)).cast("date")})
elif column_type == "int":
conversions.update({column_name: lambda c: f.col(c).cast("int")})
else:
conversions.update({column_name: lambda c: f.col(c)})

当使用上述动态生成的字典时,数据类型的转换不起作用。例如:if "COL2"如果包含"20210731",则上述代码的结果数据保持不变,即不会转换为正确的日期格式。当预定义的字典以正确的方式工作时

我在这里错过了什么,或者有一个更好的方法来实现动态生成的字典在pyspark?

在我的代码中有一个新手错误,在if-then-else块中,我有两个单独的语句column_type == "date">