Spark Streaming:
我正在接收一个由两列组成的数据帧。第一列是string
类型包含一个json
字符串和第二列包含schema
value
(第一列)。
Batch: 0
-------------------------------------------
+--------------------+--------------------+
| value| schema|
+--------------------+--------------------+
|{"event_time_abc...|`event_time_abc...|
+--------------------+--------------------+
表存储在val input
(不可变变量)中。我使用DataType.fromDDL
函数以以下方式将string
类型转换为json
数据帧:
val out= input.select(from_json(col("value").cast("string"),ddl(col("schema"))))
其中ddl
是一个预定义的函数,DataType.from_ddl(_:String):DataType
在spark(scala),但我已经注册了它,以便我可以在整个列上使用它,而不是字符串。我是这样做的:
val ddl:UserDefinedFunction = udf(DataType.fromDDL(_:String):DataType)
,这里是input
表的列、值和模式的最终转换。
val out = input.select(from_json(col("value").cast("string"),ddl(col("schema"))))
然而,我在这一行获得登记的例外:
val ddl:UserDefinedFunction = udf(DataType.fromDDL(_:String):DataType)
错误是:
java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.types.DataType is not supported
如果我使用:
val out = input.select(from_json(col("value").cast("string"),DataType.fromDDL("`" + "event_time_human2"+"`" +" STRING")).alias("value"))
然后它工作,但正如你所看到的,我只使用string
(手动输入来自schema
列)在函数DataType.fromDDL(_:String):DataType
内。
那么我如何将这个函数应用到整个列而不注册,或者有任何其他方法来注册函数?
编辑:from_json
函数的第一个参数需要一个列,而第二个参数需要schema
而不是一个列。因此,我猜需要手动方法来解析每个value
字段和每个schema
字段。经过一番调查,我发现数据框架不支持数据类型。
既然在这个问题上设置了赏金。我想提供关于数据和模式的额外信息。模式以DDL(字符串类型)定义,可以使用from_DDL
函数进行解析。值是一个简单的json
字符串,将使用我们使用from_DDL
函数派生的模式进行解析。
基本思想是每个值都有自己的模式,需要用相应的模式进行解析。应该创建一个新列来存储结果。
数据:下面是一个数据示例:
value ={"event_time_human2":"09:45:00 +0200 09/27/2021"}
schema = " ' event_time_human2 ' STRING">
不需要转换为正确的时间格式。只要一个字符串就可以了。
在streaming context
中。所以,不是所有的方法都有效。
模式在运行之前被应用和验证,也就是说,在执行器上执行Spark代码之前。解析后的模式必须是执行计划的一部分,因此到目前为止,模式解析还不能按照您的预期动态执行。这就是你看到异常的原因:java.lang.UnsupportedOperationException: Schema for type org.apache.spark.sql.types.DataType is not supported
仅用于UDF。因此,这意味着DataType.fromDDL
应该只在驱动程序代码中使用,而不是在运行时/执行器代码中使用,这是您的UDF函数中的代码。在UDF函数中,Spark已经使用您在驱动程序端指定的模式执行了导入数据的转换。这就是不能在UDF中直接使用DataType.fromDDL
的原因,因为它基本上是无用的。所有这些都意味着在UDF函数中,我们只能使用基本的Scala/Java类型和Spark API提供的一些包装器,例如WrappedArray。
的另一种选择可能是收集所有模式的驱动程序。然后为每个模式创建一个包含pair (schema, dataframe)的映射。
请记住,将数据收集到驱动程序是一个昂贵的操作,并且只有当您有合理数量的唯一模式(即最多数千个)时才有意义。此外,将这些模式应用到每个数据集需要在驱动程序中依次完成,这也非常昂贵,因此重要的是要认识到,建议的解决方案只有在您拥有有限数量的唯一模式时才能有效地工作。
到目前为止,您的代码可能如下所示:import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types.StructType
import spark.implicits._
val df = Seq(
("""{"event_time_human2":"09:45:00 +0200 09/27/2021", "name":"Pinelopi"}""", "`event_time_human2` STRING, name STRING"),
("""{"first_name":"Pin", "last_name":"Halen"}""", "first_name STRING, last_name STRING"),
("""{"code":993, "full_name":"Sofia Loren"}""", "code INT, full_name STRING")
).toDF("value", "schema")
val schemaDf = df.select("schema").distinct()
val dfBySchema = schemaDf.collect().map{ row =>
val schemaValue = row.getString(0)
val ddl = StructType.fromDDL(schemaValue)
val filteredDf = df.where($"schema" === schemaValue)
.withColumn("value", from_json($"value", ddl))
(schemaValue, filteredDf)
}.toMap
// Map(
// `event_time_human2` STRING, name STRING -> [value: struct<event_time_human2: string, name: string>, schema: string],
// first_name STRING, last_name STRING -> [value: struct<first_name: string, last_name: string>, schema: string],
// code INT, full_name STRING -> [value: struct<code: int, full_name: string>, schema: string]
// )
首先,我们使用schemaDf.collect()
收集每个唯一模式。然后我们遍历模式并根据当前模式过滤初始df。我们也使用from_json
转换当前字符串值列的特定模式。
注意我们不能有一个具有不同数据类型的公共列,这就是我们为每个模式创建不同df而不是一个最终df的原因。