我有一个Kafka Stream源和一个映射表,我想加入它们,然后将数据写入另一个Kavka主题。此作业全天候不间断运行。
我的问题是,我希望加入的Map表是按日期分区的,每天我都需要新更新的Map表来进行加入。
但当代码运行时,它会日复一日地使用相同的旧地图表,而不更新它
import java.text.SimpleDateFormat
object joiningDF{
def newDate: String = {
val dFormat = new SimpleDateFormat("yyyy-MM-dd")
dateFormat.format(System.currentTimeMillis)
}
def main(args: Array[String]): Unit = {
var date=newDate
val source =spark.readStream.
format("kafka").
option("kafka.bootstrap.servers", "....").
option("subscribe", "....").
option("startingOffsets", "latest").
load()
// MAP TABLE date variable is used to get new date daily
var map=spark.read.parquet("path/day="+date)
val joinDF=source.join(map,Seq("id"),"left")
val outQ = joinDF.
writeStream.
outputMode("append").
format("kafka").
option("kafka.bootstrap.servers", "...").
option("topic", "...").
option("checkpointLocation", "...").
trigger(Trigger.ProcessingTime("300 seconds")).
start()
outQ.awaitTermination()
}
}
有没有办法解决或解决问题?
您可以在连续处理模式下使用FileSystem源代码,该源代码监视一个目录,当文件的新版本准备就绪时,您可以将其原子式地移动到该目录中。这将为您提供一个可加入的更新流。