如何使用来自数据集的一个列中的模式来解析另一列并使用Spark Streaming 2.2.0创建一个扁平的数据集?
我有以下源数据框架我从kafka
中读取消息创建的col1: string
col2: json string
col1 | col2
---------------------------------------------------------------------------
schemaUri1 | "{"name": "foo", "zipcode": 11111}"
schemaUri2 | "{"name": "bar", "zipcode": 11112, "id": 1234}"
schemaUri1 | "{"name": "foobar", "zipcode": 11113}"
schemaUri2 | "{"name": "barfoo", "zipcode": 11114, "id": 1235, "interest": "reading"}"
我的目标数据框架
name | zipcode | id | interest
--------------------------------
foo | 11111 | null | null
bar | 11112 | 1234 | null
foobar | 11113 | null | null
barfoo | 11114 | 1235 | reading
假设您有以下功能
//此函数返回一个构造型,代表给定的示意图的模式
public StructType getSchema(String schemaUri)
架构列无关紧要(无论如何都不能与Spark API一起使用)。所有相关的是您要提取的列:
val names = Seq("name", "zipcode", "id", "interest")
df.select(names.map(s => get_json_object($"col2", s"$$.${s}") as s): _*)
或:
import org.apache.spark.sql.types._
val superSchema = StructType(Seq(
StructField("name", StringType),
StructField("zipcode", IntegerType),
StructField("id", LongType),
StructField("interest", StringType)
))
df.select(from_json($"col2", superSchema).alias("_")).select($"_.*")
这是一个不确定的问题的一个很好的例子。在假期精神上,让我们无视缺乏尝试,而要专注于实际问题:
- 结构化流是...结构化的。这意味着它需要一个定义明确的模式。因此,它例如禁用架构推理。
-
作为字段参考提供的模式是没有用的:
- 它不能与现有API一起使用(示例
from_json
只能使用字符串文字)。 - 如果可以使用,则无法将此信息传播回计划者。
- 最后,它已经过时-
JSON
本身是自我描述的,并且不需要架构进行解析。Spark功能需要此信息的原因是,Planner需要在启动查询之前计算执行计划。
- 它不能与现有API一起使用(示例
即使您可以解析数据,您的评论引入了另一个问题:
我可能会有问题,因为我不知道时间
之前的架构
如果您不知道架构,那么使用结果数据集就几乎无法做到。归根结底,分析数据并不比JSON BLOB更好。
没有真正的问题 - 您要解决的问题到底是什么?这个问题再次缺少,但是我们可以怀疑两种情况:
-
您有一系列无关的数据流(不太可能)。这里可能的解决方案是编写数据以将KAFKA主题分开与Demultiplex
stream.select($"col1" as "topic", $"col2" as "value").writeStream .format("kafka") .option("kafka.bootstrap.servers", ...) .start()
并为每个主题创建单独的输入流,并具有已知的模式。
-
架构进化。在这种情况下,定义用于检索最新已知模式的API
- 如果所有变体都是兼容的,请使用该线程中已经显示的数据来解析数据。
- 否则将
getSchema
重新定义以将转换功能返回到最新的已知模式。
如果要升级 - 处理旧查询并创建新的查询。