如何使用来自数据集的一个列中的模式来解析另一列,并使用Spark Streaming 2.2.0创建一个扁平的数据集



如何使用来自数据集的一个列中的模式来解析另一列并使用Spark Streaming 2.2.0创建一个扁平的数据集?

我有以下源数据框架我从kafka

中读取消息创建的
col1: string
col2: json string

      col1    |   col2 
---------------------------------------------------------------------------
   schemaUri1 | "{"name": "foo", "zipcode": 11111}"
   schemaUri2 | "{"name": "bar", "zipcode": 11112, "id": 1234}"
   schemaUri1 | "{"name": "foobar", "zipcode": 11113}"
   schemaUri2 | "{"name": "barfoo", "zipcode": 11114, "id": 1235, "interest": "reading"}"

我的目标数据框架

name   | zipcode | id  | interest
-------------------------------- 
foo    | 11111  | null | null
bar    | 11112  | 1234 | null
foobar | 11113  | null | null
barfoo | 11114  | 1235 | reading

假设您有以下功能

//此函数返回一个构造型,代表给定的示意图的模式

public StructType getSchema(String schemaUri)

架构列无关紧要(无论如何都不能与Spark API一起使用)。所有相关的是您要提取的列:

val names = Seq("name", "zipcode", "id", "interest")
df.select(names.map(s => get_json_object($"col2", s"$$.${s}") as s): _*)

或:

import org.apache.spark.sql.types._
val superSchema = StructType(Seq(
  StructField("name", StringType),
  StructField("zipcode", IntegerType),
  StructField("id", LongType),
  StructField("interest", StringType)
))
df.select(from_json($"col2", superSchema).alias("_")).select($"_.*")

这是一个不确定的问题的一个很好的例子。在假期精神上,让我们无视缺乏尝试,而要专注于实际问题:

  • 结构化流是...结构化的。这意味着它需要一个定义明确的模式。因此,它例如禁用架构推理。
  • 作为字段参考提供的模式是没有用的:

    • 它不能与现有API一起使用(示例from_json只能使用字符串文字)。
    • 如果可以使用,则无法将此信息传播回计划者。
    • 最后,它已经过时-JSON本身是自我描述的,并且不需要架构进行解析。Spark功能需要此信息的原因是,Planner需要在启动查询之前计算执行计划。

即使您可以解析数据,您的评论引入了另一个问题:

我可能会有问题,因为我不知道时间

之前的架构

如果您不知道架构,那么使用结果数据集就几乎无法做到。归根结底,分析数据并不比JSON BLOB更好。

没有真正的问题 - 您要解决的问题到底是什么?这个问题再次缺少,但是我们可以怀疑两种情况:

  • 您有一系列无关的数据流(不太可能)。这里可能的解决方案是编写数据以将KAFKA主题分开与Demultiplex

    stream.select($"col1" as "topic", $"col2" as "value").writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", ...)
      .start()
    

    并为每个主题创建单独的输入流,并具有已知的模式。

  • 架构进化。在这种情况下,定义用于检索最新已知模式的API

    • 如果所有变体都是兼容的,请使用该线程中已经显示的数据来解析数据。
    • 否则将getSchema重新定义以将转换功能返回到最新的已知模式。

    如果要升级 - 处理旧查询并创建新的查询。

最新更新