我有这行:
val decryptedDFData = sqlContext.read.json(patientTable.select("data").map(row => decrypt(row.toString())))
它只是从另一个DataFrame"patientTable"中选择"data"列,并逐行应用我的解密函数,然后创建另一个数据框架。我该如何:在知道架构不会被修复的情况下将加密函数应用于原始DataFrame(但"data"属性将始终存在),或者将新DataFrame的每一行作为结构插入到它以前对应的行中?
使用udf:
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
def decrypt(s: String) = s
val decryptUDF = udf(decrypt _)
patientTable.select(col("*"), decryptUDF(col("data").cast(StringType)))