当使用from_json
与Encoders
创建的schema
时,从case class
但只使用DF,不使用DS,如下所示:
case class MyProducts(PRODUCT_ID: Option[String], DESCRIPTION: Option[String], PRICE: Option[Int], OLD_FIELD_1: Option[String])
val ProductsSchema = Encoders.product[MyProducts].schema
val df_products_output_final = df_products_output.withColumn("parsedProducts", from_json(col("afterImage"), ProductsSchema))
- 当定义PRICE为Int时,我在字段中得到一个空值。
- 当定义PRICE为String时,我在字段中得到一个字符串值。
- Int的DF定义在DF模式中是正确的。
这里有什么问题?
代码:
import org.json4s._
import org.json4s.jackson.JsonMethods._
import spark.implicits._
import org.apache.spark.sql.functions.{col, lit, when, from_json, map_keys, map_values, regexp_replace, coalesce}
import org.apache.spark.sql.types.{MapType, StringType}
import org.apache.spark.sql.Encoders
import org.apache.spark.sql.types.{MapType, StringType, StructType, IntegerType}
case class MyMeta(op: String, table: String)
val metaSchema = Encoders.product[MyMeta].schema
case class MySales(NUM: Option[Integer], PRODUCT_ID: Option[String], DESCRIPTION: Option[String], OLD_FIELD_1: Option[String])
val salesSchema = Encoders.product[MySales].schema
case class MyProducts(PRODUCT_ID: Option[String], DESCRIPTION: Option[String], PRICE: Option[Int], OLD_FIELD_1: Option[String])
val ProductsSchema = Encoders.product[MyProducts].schema
def getAfterImage (op: String, data: String, key: String, jsonOLD_TABLE_FIELDS: String) : String = {
val jsonOLD_FIELDS = parse(jsonOLD_TABLE_FIELDS)
val jsonData = parse(data)
val jsonKey = parse(key)
op match {
case "ins" =>
return(compact(render(jsonData merge jsonOLD_FIELDS)))
case _ =>
val Diff(changed, added, deleted) = jsonKey diff jsonData
return(compact(render(changed merge deleted merge jsonOLD_FIELDS)))
}
}
val afterImage = spark.udf.register("callUDFAI", getAfterImage _)
val path = "/FileStore/tables/json_0006_file.txt"
val df = spark.read.text(path) // String.
val df2 = df.withColumn("value", from_json(col("value"), MapType(StringType, StringType)))
val df3 = df2.select(map_values(col("value")))
val df4 = df3.select($"map_values(value)"(0).as("meta"), $"map_values(value)"(1).as("data"), $"map_values(value)"(2).as("key")).withColumn("parsedMeta", from_json(col("meta"), metaSchema)).drop("meta").select(col("parsedMeta.*"), col("data"), col("key")).withColumn("key2", coalesce(col("key"), lit(""" { "DUMMY_FIELD_XXX": ""} """) )).toDF().cache()
// DF at this stage, not a DF.
val df_sales = df4.filter('table === "BILL.SALES")
val df_products = df4.filter('table === "BILL.PRODUCTS")
val df_sales_output = df_sales.withColumn("afterImage", afterImage(col("op"), col("data"), col("key2") , lit(""" { "OLD_FIELD_1": ""} """)))
.select("afterImage")
val df_products_output = df_products.withColumn("afterImage", afterImage(col("op"), col("data"), col("key2") , lit(""" { "OLD_FIELD_A":"", "OLD_FIELD_B":""} """)))
.select("afterImage")
val df_sales_output_final = df_sales_output.withColumn("parsedSales", from_json(col("afterImage"), salesSchema))
df_products_output_final.show(false)
df_products_output_final.printSchema()
PRICE
字段值周围的引号会把这搞砸。
如果你改变了输入数据
{ "meta":{ "op":"upd", "table":"BILL.PRODUCTS" }, "data":{ "DESCRIPTION":"XXX" }, "key":{ "PRODUCT_ID":"230117", "DESCRIPTION":"Hamsberry vintage tee, cherry", "PRICE":"4099" }}
{ "meta":{ "op":"upd", "table":"BILL.PRODUCTS" }, "data":{ "PRICE":"4000" }, "key":{ "PRODUCT_ID":"230117", "DESCRIPTION":"Hamsberry vintage tee, cherry", "PRICE":"3599" }}
{ "meta":{ "op":"upd", "table":"BILL.PRODUCTS" }, "data":{ "DESCRIPTION":"XXX" }, "key":{ "PRODUCT_ID":"230117", "DESCRIPTION":"Hamsberry vintage tee, cherry", "PRICE":4099 }}
{ "meta":{ "op":"upd", "table":"BILL.PRODUCTS" }, "data":{ "PRICE":4000 }, "key":{ "PRODUCT_ID":"230117", "DESCRIPTION":"Hamsberry vintage tee, cherry", "PRICE":3599 }}
(区别只是PRICE
值周围的引号)。
然后从脚本中得到如下输出:
+--------------------------------------------------------------------------------------------------------------------+---------------------------------------------------+
|afterImage |parsedProducts |
+--------------------------------------------------------------------------------------------------------------------+---------------------------------------------------+
|{"DESCRIPTION":"XXX","PRODUCT_ID":"230117","PRICE":4099,"OLD_FIELD_A":"","OLD_FIELD_B":""} |{230117, XXX, 4099, null} |
|{"PRICE":4000,"PRODUCT_ID":"230117","DESCRIPTION":"Hamsberry vintage tee, cherry","OLD_FIELD_A":"","OLD_FIELD_B":""}|{230117, Hamsberry vintage tee, cherry, 4000, null}|
+--------------------------------------------------------------------------------------------------------------------+---------------------------------------------------+
root
|-- afterImage: string (nullable = true)
|-- parsedProducts: struct (nullable = true)
| |-- PRODUCT_ID: string (nullable = true)
| |-- DESCRIPTION: string (nullable = true)
| |-- PRICE: integer (nullable = true)
| |-- OLD_FIELD_1: string (nullable = true)
不再有PRICE
的null
值了!!