将Spark数据帧保存为配置单元中的动态分区表



我有一个示例应用程序,用于将csv文件读取到数据帧中。可以使用以下方法将数据帧存储到镶木地板格式的Hive表中df.saveAsTable(tablename,mode)

上面的代码工作得很好,但我每天都有太多的数据,所以我想根据creationdate(表中的列(对配置单元表进行动态分区。

有没有任何方法可以对数据帧进行动态分区并将其存储到hive仓库中。希望避免使用hivesqlcontext.sql(insert into table partittioin by(date)....)对插入语句进行硬编码。

这个问题可以看作是对以下内容的扩展:如何将DataFrame直接保存到Hive?

我们非常感谢您的帮助。

我相信它的工作原理是这样的:

df是具有年、月和其他列的数据帧

df.write.partitionBy('year', 'month').saveAsTable(...)

df.write.partitionBy('year', 'month').insertInto(...)

我能够使用df.write().mode(SaveMode.Append).partitionBy("colname").saveAsTable("Table") 写入分区配置单元表

我必须启用以下属性才能使其工作。

hiveContext.setConf("hive.exec.dynamic.partition","true"(hiveContext.setConf("hive.exec.dynamic.partition.mode","nonstrict"(

我也遇到了同样的事情,但使用了以下技巧我解决了。

  1. 当我们对任何表执行分区操作时,分区列将区分大小写。

  2. 分区列应存在于具有相同名称的DataFrame中(区分大小写(。代码:

    var dbName="your database name"
    var finaltable="your table name"
    // First check if table is available or not..
    if (sparkSession.sql("show tables in " + dbName).filter("tableName='" +finaltable + "'").collect().length == 0) {
         //If table is not available then it will create for you..
         println("Table Not Present n  Creating table " + finaltable)
         sparkSession.sql("use Database_Name")
         sparkSession.sql("SET hive.exec.dynamic.partition = true")
         sparkSession.sql("SET hive.exec.dynamic.partition.mode = nonstrict ")
         sparkSession.sql("SET hive.exec.max.dynamic.partitions.pernode = 400")
         sparkSession.sql("create table " + dbName +"." + finaltable + "(EMP_ID        string,EMP_Name          string,EMP_Address               string,EMP_Salary    bigint)  PARTITIONED BY (EMP_DEP STRING)")
         //Table is created now insert the DataFrame in append Mode
         df.write.mode(SaveMode.Append).insertInto(empDB + "." + finaltable)
    }
    

它可以在SparkSession上以这种方式配置:

spark = SparkSession 
    .builder 
    ...
    .config("spark.hadoop.hive.exec.dynamic.partition", "true") 
    .config("spark.hadoop.hive.exec.dynamic.partition.mode", "nonstrict") 
    .enableHiveSupport() 
    .getOrCreate()

或者您可以将它们添加到.properties文件中

Spark配置(至少在2.4中(需要spark.hadoop前缀,以下是Spark设置该配置的方式:

  /**
   * Appends spark.hadoop.* configurations from a [[SparkConf]] to a Hadoop
   * configuration without the spark.hadoop. prefix.
   */
  def appendSparkHadoopConfigs(conf: SparkConf, hadoopConf: Configuration): Unit = {
    SparkHadoopUtil.appendSparkHadoopConfigs(conf, hadoopConf)
  }

这对我很有用。我设置了这些设置,然后将数据放入分区表中。

from pyspark.sql import HiveContext
sqlContext = HiveContext(sc)
sqlContext.setConf("hive.exec.dynamic.partition", "true")
sqlContext.setConf("hive.exec.dynamic.partition.mode", 
"nonstrict")

这对我使用python和spark 2.1.0很有效。

不确定这是否是最好的方法,但它有效。。。

# WRITE DATA INTO A HIVE TABLE
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession 
    .builder 
    .master("local[*]") 
    .config("hive.exec.dynamic.partition", "true") 
    .config("hive.exec.dynamic.partition.mode", "nonstrict") 
    .enableHiveSupport() 
    .getOrCreate()
### CREATE HIVE TABLE (with one row)
spark.sql("""
CREATE TABLE IF NOT EXISTS hive_df (col1 INT, col2 STRING, partition_bin INT)
USING HIVE OPTIONS(fileFormat 'PARQUET')
PARTITIONED BY (partition_bin)
LOCATION 'hive_df'
""")
spark.sql("""
INSERT INTO hive_df PARTITION (partition_bin = 0)
VALUES (0, 'init_record')
""")
###
### CREATE NON HIVE TABLE (with one row)
spark.sql("""
CREATE TABLE IF NOT EXISTS non_hive_df (col1 INT, col2 STRING, partition_bin INT)
USING PARQUET
PARTITIONED BY (partition_bin)
LOCATION 'non_hive_df'
""")
spark.sql("""
INSERT INTO non_hive_df PARTITION (partition_bin = 0)
VALUES (0, 'init_record')
""")
###
### ATTEMPT DYNAMIC OVERWRITE WITH EACH TABLE
spark.sql("""
INSERT OVERWRITE TABLE hive_df PARTITION (partition_bin)
VALUES (0, 'new_record', 1)
""")
spark.sql("""
INSERT OVERWRITE TABLE non_hive_df PARTITION (partition_bin)
VALUES (0, 'new_record', 1)
""")
spark.sql("SELECT * FROM hive_df").show() # 2 row dynamic overwrite
spark.sql("SELECT * FROM non_hive_df").show() # 1 row full table overwrite
df1.write
   .mode("append")
   .format('ORC')
   .partitionBy("date")
   .option('path', '/hdfs_path')
   .saveAsTable("DB.Partition_tablename")

它将创建具有"0"的分区;日期";列值,也将从spark DF写入配置单元中的配置单元外部表。

最新更新