基于列表或字典动态选择和别名Databricks中的列



我在Databricks(bronze(中有大量的原始delta表,我只想通过选择几列并适当地重命名它们来清理它们(在将它们保存到新数据库之前;silver(。

例如:

df.select(col("CUSTOMERNUMBER").alias('CustomerNumber'),
col("NAME1").alias('CustomerName'),
col("LANGUAGE").alias('Language'),            
col("TRANSACTIONTIMESTAMP").alias('TransactionTimestamp'))

然而,我想创建一个函数,它接受df和映射,并动态地进行选择。

大致如下:

mapCustomer = {'CUSTOMERNUMBER' : 'CustomerNumber', 
'NAME1': 'CustomerName', 
'LANGUAGE' : 'Language', 
'TRANSACTIONTIMESTAMP': 'TransactionTimestamp'}
def map_col(df, mapping):
return df.select(mapping)
mapped_df = map_col(df, mapCustomer)
print(mapped_df)
# save df to new location (Silver layer)
# mapped_df.write.format('delta')....

如何才能做到这一点?

我得到一个错误:

Invalid argument, not a string or column: {'CUSTOMERNUMBER': 'CustomerNumber', 'NAME1': 'CustomerName', 'LANGUAGE': 'Language', 'TRANSACTIONTIMESTAMP': 'TransactionTimestamp'} of type <class 'dict'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

我找到了一个似乎有效的解决方案:

def filter_data(df, mapping):
mylist = []
for key in mapping:
mykey = key
value =  mapping[key]
mylist.append(col(key).alias(value))
return df.select(mylist)
mapCustomer = {'CUSTOMERNUMBER' : 'CustomerNumber', 'NAME1': 'CustomerName', 'LANGUAGE' : 'Language', 'TRANSACTIONTIMESTAMP': 'TransactionTimestamp'}
filtered = filter_data(added_col, mapCustomer)
display(filtered)