在 Spark 结构化流中对 createOrReplaceTempView 编写查询时面临问题



下面是我在foreachBatch中 Spark 结构化流中的代码

df.writeStream.trigger(Trigger.ProcessingTime("10 seconds")).foreachBatch((batchDF: DataFrame, batchId: Long) => {
batchDF.persist
batchDF.createOrReplaceTempView("all_notifis");
batchDF.write.mode(SaveMode.Append).saveAsTable("api_notifications_topics");
val meta_data= spark.sql("select topic,partition,max(msg_timestamp) as msg_ts ,max(off_set) as max_offset from all_notifis  group by topic,partition")
meta_data.write.mode(SaveMode.Append).saveAsTable("api_notifics_metadata");
batchDF.unpersist()
}).start().awaitTermination()

即使我创建了tempview("all_notifis"),它也试图从 hive 默认数据库中获取该表并抛出以下错误

由以下原因引起:org.apache.spark.sql.catalyst.analysis.NoSuchTableException:在数据库"default"中找不到表或视图all_notifis;

任何人都可以帮助解决问题吗?

它得到了解决。而不是 val meta_data= spark.sql("select topic,partition,max(msg_timestamp( as msg_ts,max(off_set( as max_offset from all_notifis group by topic,partition"( 我们必须给 val meta_data=batchDF.sparkSession.sql(s"select topic,partition,max(off_set( 作为max_offset,min(off_set( 作为min_offset,max(msg_timestamp( 作为msg_ts从all_notifis按主题分组,分区"(

最新更新