我有兴趣能够在给定 Spark 对象 (SparkSession) 的情况下检索 Hive 表的位置值。获取此值的一种方法是通过以下 SQL 查询分析位置的输出:
describe formatted <table name>
我想知道是否有另一种方法可以在不必解析输出的情况下获取位置值。如果上述命令的输出在 Hive 版本之间发生变化,API 会很棒。如果需要外部依赖,那会是哪个?是否有一些可以获取位置值的示例火花代码?
这是正确答案:
import org.apache.spark.sql.catalyst.TableIdentifier
lazy val tblMetadata = spark.sessionState.catalog.getTableMetadata(new TableIdentifier(tableName,Some(schema)))
您还可以在desc formatted table
上使用.toDF
方法,然后从数据帧进行筛选。
DataframeAPI:
scala> :paste
spark.sql("desc formatted data_db.part_table")
.toDF //convert to dataframe will have 3 columns col_name,data_type,comment
.filter('col_name === "Location") //filter on colname
.collect()(0)(1)
.toString
Result:
String = hdfs://nn:8020/location/part_table
(or)
RDD Api:
scala> :paste
spark.sql("desc formatted data_db.part_table")
.collect()
.filter(r => r(0).equals("Location")) //filter on r(0) value
.map(r => r(1)) //get only the location
.mkString //convert as string
.split("8020")(1) //change the split based on your namenode port..etc
Result:
String = /location/part_table
以下是在 PySpark 中执行此操作的方法:
(spark.sql("desc formatted mydb.myschema")
.filter("col_name=='Location'")
.collect()[0].data_type)
第一种方法
您可以将input_file_name
与数据帧一起使用。
它将为您提供零件文件的绝对文件路径。
spark.read.table("zen.intent_master").select(input_file_name).take(1)
然后从中提取表路径。
第二种方法
你可以说它更多的是黑客。
package org.apache.spark.sql.hive
import java.net.URI
import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.internal.{SessionState, SharedState}
import org.apache.spark.sql.SparkSession
class TableDetail {
def getTableLocation(table: String, spark: SparkSession): URI = {
val sessionState: SessionState = spark.sessionState
val sharedState: SharedState = spark.sharedState
val catalog: SessionCatalog = sessionState.catalog
val sqlParser: ParserInterface = sessionState.sqlParser
val client = sharedState.externalCatalog match {
case catalog: HiveExternalCatalog => catalog.client
case _: InMemoryCatalog => throw new IllegalArgumentException("In Memory catalog doesn't " +
"support hive client API")
}
val idtfr = sqlParser.parseTableIdentifier(table)
require(catalog.tableExists(idtfr), new IllegalArgumentException(idtfr + " done not exists"))
val rawTable = client.getTable(idtfr.database.getOrElse("default"), idtfr.table)
rawTable.location
}
}
USE ExternalCatalog
scala> spark
res15: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@4eba6e1f
scala> val metastore = spark.sharedState.externalCatalog
metastore: org.apache.spark.sql.catalyst.catalog.ExternalCatalog = org.apache.spark.sql.hive.HiveExternalCatalog@24b05292
scala> val location = metastore.getTable("meta_data", "mock").location
location: java.net.URI = hdfs://10.1.5.9:4007/usr/hive/warehouse/meta_data.db/mock
在 scala 项目中将其用作可重用函数
def getHiveTablePath(tableName: String, spark: SparkSession):String =
{
import org.apache.spark.sql.functions._
val sql: String = String.format("desc formatted %s", tableName)
val result: DataFrame = spark.sql(sql).filter(col("col_name") === "Location")
result.show(false) // just for debug purpose
val info: String = result.collect().mkString(",")
val path: String = info.split(',')(1)
path
}
来电者将是
println(getHiveTablePath("src", spark)) // you can prefix schema if you have
结果(我在本地执行,所以文件:/下面如果它的HDFS hdfs://会来):
+--------+------------------------------------+-------+
|col_name|data_type |comment|
+--------+--------------------------------------------+
|Location|file:/Users/hive/spark-warehouse/src| |
+--------+------------------------------------+-------+
file:/Users/hive/spark-warehouse/src