如何在 Spark 结构化流式处理中将 kafka 时间戳值作为列包含在内



我正在寻找将 kafka 的时间戳值添加到我的 Spark 结构化流架构的解决方案。我已经从 kafka 中提取了值字段并制作了数据帧。我的问题是,我还需要获取时间戳字段(来自 kafka)以及其他列。

这是我当前的代码:

val kafkaDatademostr = spark
  .readStream 
  .format("kafka")
  .option("kafka.bootstrap.servers","zzzz.xxx.xxx.xxx.com:9002")
  .option("subscribe","csvstream")
  .load
val interval = kafkaDatademostr.select(col("value").cast("string")).alias("csv")
  .select("csv.*")
val xmlData = interval.selectExpr("split(value,',')[0] as ddd" ,
    "split(value,',')[1] as DFW",
    "split(value,',')[2] as DTG",
    "split(value,',')[3] as CDF",
    "split(value,',')[4] as DFO",
    "split(value,',')[5] as SAD",
    "split(value,',')[6] as DER",
    "split(value,',')[7] as time_for",
    "split(value,',')[8] as fort")

如何从 kafka 获取时间戳并与其他列一起添加为列?

时间戳

包含在源架构中。 只需添加一个"选择时间戳"即可获取如下所示的时间戳。

val interval = kafkaDatademostr.select(col("value").cast("string").alias("csv"), col("timestamp")).select("csv.*", "timestamp")

在 Apache Spark 官方网页上,您可以找到指南:结构化流 + Kafka 集成指南(Kafka 代理版本 0.10.0 或更高版本)

在这里,您可以找到有关从 Kafka 加载的数据帧架构的信息。

Kafka 源代码中的每一行都有以下列:

  • 键 - 消息键
  • 值 - 消息值
  • 主题 - 名称消息主题
  • 分区 - 消息来自的分区
  • 偏移量 - 消息的偏移量
  • 时间戳 - 时间戳
  • 时间戳类型 时间戳类型

以上所有列都可供查询。在您的示例中,您只使用 value ,因此要获取时间戳,只需将timestamp添加到您的 select 语句中:

  val allFields = kafkaDatademostr.selectExpr(
    s"CAST(value AS STRING) AS csv",
    s"CAST(key AS STRING) AS key",
    s"topic as topic",
    s"partition as partition",
    s"offset as offset",
    s"timestamp as timestamp",
    s"timestampType as timestampType"
  )

就卡夫卡而言,我以JSON格式接收值。其中包含实际数据以及原始事件时间而不是卡夫卡时间戳。下面是架构。

val mySchema = StructType(Array(
      StructField("time", LongType),
      StructField("close", DoubleType)
    ))

为了使用 Spark 结构化流的水印功能,我必须将时间字段转换为时间戳格式。

val df1 = df.selectExpr("CAST(value AS STRING)").as[(String)]
      .select(from_json($"value", mySchema).as("data"))
      .select(col("data.time").cast("timestamp").alias("time"),col("data.close"))

现在,您可以将时间字段用于窗口操作以及水印目的。

import spark.implicits._
val windowedData = df1.withWatermark("time","1 minute")
                      .groupBy(
                          window(col("time"), "1 minute", "30 seconds"),
                          $"close"
                      ).count()

我希望这个答案能澄清。

最新更新