使用csv的databricks自动加载器时,如何处理列名中的无效字符



我正试图设置一个databricks自动加载器流来读取大量csv文件,但我收到了错误Found invalid character(s) among " ,;{}()nt=" in the column names of your schema.,因为.csv列名包含空格。该消息建议通过setting table property 'delta.columnMapping.mode' to 'name'启用列映射,并将我介绍给这个文档页面,但我看不到实现这一点的方法。

这是设置流的代码:

stream = spark.readStream.format("cloudFiles")
.option('cloudFiles.format', 'csv')
.option('cloudFiles.schemaLocation', delta_loc)
.option("rescuedDataColumn", "_rescued_data")
.option('header', 'true')
.option('delimiter', '|')
.option('pathGlobFilter', f"*{file_code}*.csv")
.load(data_path)

我们在几种情况下都有这个问题,所以我们在阅读器中这样做:
.transform(lambda df: remove_bda_chars_from_columns(df))

UDF是:

def remove_bda_chars_from_columns(df):
return  df.select([col(x).alias(x.replace(" ", "_").replace("/", "").replace("%", "pct").replace("(", "").replace(")", "")) for x in df.columns])

最新更新