下面是我在foreachBatch
中 Spark 结构化流中的代码
df.writeStream.trigger(Trigger.ProcessingTime("10 seconds")).foreachBatch((batchDF: DataFrame, batchId: Long) => {
batchDF.persist
batchDF.createOrReplaceTempView("all_notifis");
batchDF.write.mode(SaveMode.Append).saveAsTable("api_notifications_topics");
val meta_data= spark.sql("select topic,partition,max(msg_timestamp) as msg_ts ,max(off_set) as max_offset from all_notifis group by topic,partition")
meta_data.write.mode(SaveMode.Append).saveAsTable("api_notifics_metadata");
batchDF.unpersist()
}).start().awaitTermination()
即使我创建了tempview("all_notifis")
,它也试图从 hive 默认数据库中获取该表并抛出以下错误
由以下原因引起:org.apache.spark.sql.catalyst.analysis.NoSuchTableException:在数据库"default"中找不到表或视图all_notifis
;
任何人都可以帮助解决问题吗?
它得到了解决。而不是 val meta_data= spark.sql("select topic,partition,max(msg_timestamp( as msg_ts,max(off_set( as max_offset from all_notifis group by topic,partition"( 我们必须给 val meta_data=batchDF.sparkSession.sql(s"select topic,partition,max(off_set( 作为max_offset,min(off_set( 作为min_offset,max(msg_timestamp( 作为msg_ts从all_notifis按主题分组,分区"(