我试图使用以下代码代替 query.awaitTermination(( 在 Spark 中重新启动流式查询,下面的代码将位于无限循环中并查找触发器以重新启动查询,然后执行下面的代码。基本上我试图刷新缓存的df。
query.processAllavaialble()
query.stop()
//oldDF is a cached Dataframe created from GlobalTempView which is of size 150GB.
oldDF.unpersist()
val inputDf: DataFrame = readFile(spec, sparkSession) //read file from S3
or anyother source
val recreateddf = inputDf.persist()
//Start the query// here should i start query again by invoking readStream ?
但是当我查看火花文档时,它说
void processAllAvailable() ///documentation says This method is intended for testing/// Blocks until all available data in the source has been processed and committed to the sink. This method is intended for testing. Note that in the case of continually arriving data, this method may block forever. Additionally, this method is only guaranteed to block until data that has been synchronously appended data to a Source prior to invocation. (i.e. getOffset must immediately reflect the addition).
stop() Stops the execution of this query if it is running. This method blocks until the threads performing execution has stopped.
那么在不停止我的 Spark 流应用程序的情况下重新启动查询的更好方法
这对我有用。
下面是我在 Spark 2.4.5 中遵循的左外连接和左连接方案。下面的过程正在推动火花读取最新的维度数据更改。
流程适用于具有批处理维度的流联接(始终更新(
第 1 步:-
在启动 Spark 流式处理作业之前:- 确保维度批处理数据文件夹只有一个文件,并且该文件应至少有一个记录(由于某种原因放置空文件不起作用(/
第 2 步:- 启动流式处理作业并在 kafka 流中添加流记录
第 3 步:- 用值覆盖暗淡的数据(文件应同名不要更改,维度文件夹应只有一个文件( 注意:- 不要使用 Spark 写入此文件夹 使用 Java 或 Scala filesystem.io 覆盖文件或 bash 删除文件并替换为同名的新数据文件。
步骤4:- 在下一批中,Spark能够在加入kafka流时读取更新的维度数据...
示例代码:-
package com.databroccoli.streaming.streamjoinupdate
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.{DataFrame, SparkSession}
object BroadCastStreamJoin3 {
def main(args: Array[String]): Unit = {
@transient lazy val logger: Logger = Logger.getLogger(getClass.getName)
Logger.getLogger("akka").setLevel(Level.WARN)
Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("com.amazonaws").setLevel(Level.ERROR)
Logger.getLogger("com.amazon.ws").setLevel(Level.ERROR)
Logger.getLogger("io.netty").setLevel(Level.ERROR)
val spark = SparkSession
.builder()
.master("local")
.getOrCreate()
val schemaUntyped1 = StructType(
Array(
StructField("id", StringType),
StructField("customrid", StringType),
StructField("customername", StringType),
StructField("countrycode", StringType),
StructField("timestamp_column_fin_1", TimestampType)
))
val schemaUntyped2 = StructType(
Array(
StructField("id", StringType),
StructField("countrycode", StringType),
StructField("countryname", StringType),
StructField("timestamp_column_fin_2", TimestampType)
))
val factDf1 = spark.readStream
.schema(schemaUntyped1)
.option("header", "true")
.csv("src/main/resources/broadcasttest/fact")
val dimDf3 = spark.read
.schema(schemaUntyped2)
.option("header", "true")
.csv("src/main/resources/broadcasttest/dimension")
.withColumnRenamed("id", "id_2")
.withColumnRenamed("countrycode", "countrycode_2")
import spark.implicits._
factDf1
.join(
dimDf3,
$"countrycode_2" <=> $"countrycode",
"inner"
)
.writeStream
.format("console")
.outputMode("append")
.start()
.awaitTermination
}
}
你的问题有点不清楚(第二段代码没有使用你想要坚持的 df,所以我不确定你打算如何集成它们......我假设加入?
我们遇到了类似的问题(使用 Spark 2.1(,并通过创建 Sink (https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Sink.scala( 的自定义实现来解决它,其中数据在 addBatch 中加载。由于您的设置表明您一次只处理 1 个文件并且没有水印,因此您可以将逻辑塞入 addBatch 方法......虽然这有点笨拙(我相信我们的用例略有不同(。
如果 Spark 2.2 是一种选择,那么您很幸运。Spark 2.2 添加了"运行一次"触发器,允许您将 Spark 流式处理 API 用于批处理作业(这本质上是您要尝试执行的操作(。如果您修改写入流以使用此新触发器,则无限循环可能会起作用(尽管我从未尝试过(。最好使用外部计划程序以批处理模式运行流式处理作业。可以在此处阅读有关"运行一次"触发器的详细信息:https://databricks.com/blog/2017/05/22/running-streaming-jobs-day-10x-cost-savings.html
如果您使用的是 EMR,那么 Spark 2.2 尚不可用...但我听说它将在未来几周内发布(手指交叉(。
可以在此处找到一些完整的接收器实现示例:https://github.com/holdenk/spark-structured-streaming-ml/blob/master/src/main/scala/com/high-performance-spark-examples/structuredstreaming/CustomSink.scala