我正在尝试使用 SHA-256 通过在 Spark UDF 中作为参数传递来编码数据,但低于错误。请在下面找到程序代码段和错误详细信息。
代码片段:
package com.sample
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import java.security.MessageDigest
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions.UserDefinedFunction
import javax.xml.bind.DatatypeConverter;
import org.apache.spark.sql.Column
object Customer {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Customer-data").setMaster("local[2]").set("spark.executor.memory", "1g");
val sc = new SparkContext(conf)
val spark = SparkSession.builder().config(sc.getConf).getOrCreate()
//val hash_algm=sc.getConf.get("halgm")
val hash_algm="SHA-256"
val df = spark.read.format("csv").option("header", "true").load("file:///home/tcs/Documents/KiranDocs/Data_files/sample_data")
spark.udf.register("encriptedVal1", encriptedVal)
//calling encription UDF function
//val resDF1 = df.withColumn(("ssn_number"), encriptedVal(df("customer_id"))).show()
val resDF2 = df.withColumn(("ssn_number"), encriptedVal(array("customer_id", hash_algm))).show()
println("data set"+resDF2)
sc.stop()
}
def encriptedVal = udf((s: String,s1:String) => {
val digest = MessageDigest.getInstance(s1)
val hash = digest.digest(s.getBytes("UTF-8"))
DatatypeConverter.printHexBinary(hash)
})
}
错误详细信息如下:
线程"main"中的异常 2019-01-21 19:42:48 信息 火花上下文:54 - 从 shutdown hook 调用 stop() java.lang.ClassCastException: com.sample.Customer$$anonfun$encriptedVal$1 不能强制转换为 斯卡拉。功能1 at org.apache.spark.sql.catalyst.expressions.ScalaUDF.(ScalaUDF.scala:104) 在 org.apache.spark.sql.expressions.UserDefinedFunction.apply(UserDefinedFunction.scala:85) at com.sample.Customer$.main(Customer.scala:26) at com.sample.Customer.main(Customer.scala)
这里的问题是你如何调用定义的UDF。您应该像下面这样使用它:
val resDF1 = df.withColumn(("ssn_number"), encriptedVal(df.col("customer_id"), lit(hash_algm)))
因为它接受两个 Column
s 对象(两个Column
s 都必须是 UDF 中定义的String
类型)。