我们有3个查询当前正在HIVE上运行。
使用Spark 2.1.0
我们正试图使用Spark SQL运行它,但通过使用SparkSession(就像用Scala代码包装一个Jar&然后使用Spark Submit提交)
现在举个例子:Query-1使用3个表(表-a,b&c)并插入表-->Output_Table_1
Query-2很少使用其他表(执行Joins)和output_table_1,后者是从Query-1的输出中填充的。这会产生output_table_2
类似地,Query-3使用很少的表&可以使用也可以不使用output_table_1和/或output_table _2(我们不确定是否仍在设计)
目前,我这样做的方式是在属性文件中写入所有查询,并使用Typesafe.ConfigFactory
在def main
中读取(请建议是否有更好的方法)
在def main(){}
中,我正在做的是:
val query_1 = spark.sql('query1')
query_1.write.mode("overwrite").insertInto("output_table_1")
现在,当我执行以下步骤时,它抛出一个错误-output_table_1未找到
val query_2 = spark.sql('query2')
query_2.write.mode("overwrite").insertInto("output_table_2")
类似地,对于第三个查询和表,我得到了相同的错误。所以基本上我是在尝试Chain the Queries&在后面的查询中使用Initial查询的输出。我无法将查询分解为较小的数据帧,因为它们很复杂。
如何应对这种情况。还要让我知道实现这一目标的最佳实践是什么?
为什么不进行
query_1.registerTempTable("output_table_1")