结构化流带有定期更新的静态数据集



将流与静态数据集合并是结构化流的重要功能。但是在每批批次上,数据集将从数据源中刷新。由于这些来源并不总是那么动态,因此在指定的时间段内(或批处理数)缓存静态数据集将是性能增益。在指定的期间/批处理数之后,从缓存中从源中重新加载了数据集。

在Spark流中,我使用一个缓存的数据集对此进行了管理,并在指定数量的批量运行后取消了它,但是由于某种原因,这不再与结构化流有关。

有任何建议使用结构化流?

我有一个开发的解决方案,用于另一个问题流静态加入:如何定期刷新(毫无意义/持久)静态数据帧,这也可能有助于解决您的问题:

您可以通过使用结构化流提供的流程调度功能来做到这一点。

您可以通过创建"人造速率"来触发静态数据框架的清新(不透露式 - > load - > persist)。流式的流量会定期刷新静态数据集。这个想法是:

  1. 最初加载staticdataframe,并保留为var
  2. 定义一种刷新静态数据框的方法
  3. 使用"费率"以所需间隔(例如1小时)触发的流
  4. 阅读实际流数据并使用静态数据框架执行加入操作
  5. 在该速率流中有一个foreachBatch接收器,可以调用刷新方法

以下代码与Spark 3.0.1,Scala 2.12.10和Delta 0.7.0。

运行正常。
  // 1. Load the staticDataframe initially and keep as `var`
  var staticDf = spark.read.format("delta").load(deltaPath)
  staticDf.persist()
  //  2. Define a method that refreshes the static Dataframe
  def foreachBatchMethod[T](batchDf: Dataset[T], batchId: Long) = {
    staticDf.unpersist()
    staticDf = spark.read.format("delta").load(deltaPath)
    staticDf.persist()
    println(s"${Calendar.getInstance().getTime}: Refreshing static Dataframe from DeltaLake")
  }
  // 3. Use a "Rate" Stream that gets triggered at the required interval (e.g. 1 hour)
  val staticRefreshStream = spark.readStream
    .format("rate")
    .option("rowsPerSecond", 1)
    .option("numPartitions", 1)
    .load()
    .selectExpr("CAST(value as LONG) as trigger")
    .as[Long]
  // 4. Read actual streaming data and perform join operation with static Dataframe
  // As an example I used Kafka as a streaming source
  val streamingDf = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "test")
    .option("startingOffsets", "earliest")
    .option("failOnDataLoss", "false")
    .load()
    .selectExpr("CAST(value AS STRING) as id", "offset as streamingField")
  val joinDf = streamingDf.join(staticDf, "id")
  val query = joinDf.writeStream
    .format("console")
    .option("truncate", false)
    .option("checkpointLocation", "/path/to/sparkCheckpoint")
    .start()
  // 5. Within that Rate Stream have a `foreachBatch` sink that calls refresher method
  staticRefreshStream.writeStream
    .outputMode("append")
    .foreachBatch(foreachBatchMethod[Long] _)
    .queryName("RefreshStream")
    .trigger(Trigger.ProcessingTime("5 seconds"))
    .start()

要有一个完整的示例,Delta表的创建如下:

  val deltaPath = "file:///tmp/delta/table"
  import spark.implicits._
  val df = Seq(
    (1L, "static1"),
    (2L, "static2")
  ).toDF("id", "deltaField")
  df.write
    .mode(SaveMode.Overwrite)
    .format("delta")
    .save(deltaPath)