这就是我的数据:
configStr: String =
"
{
"validation": {
"target_feed": "tables.validation",
"data_validations":
[
{"program": "program1",
"test_description": "Checking if column1 are distinct",
"input_column": "column1",
"test": "distinctness",
"query": "select * from table1",
"condition": "None"},
{"program": "program12",
"test_description": "Checking if column2 are distinct",
"input_column": "column2",
"test": "Anomaly",
"query": "select * from table2",
"condition": "None"}
]
}
}"
我需要对数据验证进行迭代,并利用其中的每个字段。我打算这样做:
val resultsAsDf = conf("test")
.asInstanceOf[Map[String, Any]]("data_validations")
.asInstanceOf[Seq[Map[String, Any]]]
.map{ dv => someFunc(dv) }
.reduce(_.unionAll(_))
现在,创建将处理逻辑的someFunc
。我构建了这样的东西:
def someFunc(testCase: Map[String, Any]): Unit = {
if (testCase("test") == "distinctness") {
val tempDF = spark.sql(testCase("query"))
val verificationResults: VerificationResult = { VerificationSuite()
.onData(tempDF)
.addCheck(
Check(CheckLevel.Error, testCase("program"))
.hasDistinctness(testCase("column"), Check.IsOne))
.run()
}
}
else{
println("Nothing")
}
}
现在,我得到以下错误:
<console>:54: error: type mismatch;
found : Any
required: String
val tempDF = spark.sql(testCase("query"))
^
<console>:60: error: type mismatch;
found : Any
required: String
Check(CheckLevel.Error, testCase("program"))
问题是,我需要映射的值是各种类型,这就是我选择Any
的原因。有办法解决这个问题吗?还是我做错了什么?
SequenceToSequence,顺便说一下,他几乎是正确的,混淆了需要更改的类型。testCase
返回Any
,当spark.sql(_)
期望String
时,问题不是someFunc
的返回,而是Map
的返回。
def someFunc(testCase: Map[String, Any]): Unit = {}
val tempDF = spark.sql(testCase("query"))
// Spark 3.1.1 ScalaDoc
def sql(sqlText: String): DataFrame
因此,当您将String
密钥传递到映射中时,您会得到一个Any
,而不是spark.sql
所期望的String
:
testCase: Map[String, Any]
testCase("string") // Result Any
参考文件:SparkSession ScalaDoc
您的someFunc返回Unit,它类似于java的"void"类型,但是spark.sql需要一个String。您将需要修改您的testCase以返回包含查询的字符串。
def someFunc(testCase: Map[String, Any]): String = { ... }