在 Spark 数据帧写入方法中覆盖特定分区



我想覆盖特定的分区,而不是全部在 Spark 中。我正在尝试以下命令:

df.write.orc('maprfs:///hdfs-base-path','overwrite',partitionBy='col4')

其中 df 是具有要覆盖的增量数据的数据帧。

HDFS-base-path 包含主数据。

当我尝试上述命令时,它会删除所有分区,并在 hdfs 路径处插入 df 中存在的分区。

我的要求是仅覆盖指定 hdfs 路径处 df 中存在的那些分区。有人可以帮我吗?

终于!现在是 Spark 2.3.0 中的一个功能:火花-20236

要使用它,您需要将spark.sql.sources.partitionOverwriteMode设置设置为动态,数据集需要分区,写入模式overwrite。例:

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
data.write.mode("overwrite").insertInto("partitioned_table")

我建议在编写之前根据分区列进行重新分区,这样您就不会最终每个文件夹有 400 个文件。

在 Spark 2.3.0 之前,最好的解决方案是启动 SQL 语句来删除这些分区,然后使用 mode append 写入它们。

这是一个常见问题。Spark 最高 2.0 的唯一解决方案是直接写入分区目录,例如

df.write.mode(SaveMode.Overwrite).save("/root/path/to/data/partition_col=value")

如果您使用的是 2.0 之前的 Spark,则需要使用以下方法阻止 Spark 发出元数据文件(因为它们会破坏自动分区发现(:

sc.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")

如果您使用的是 1.6.2 之前的 Spark,则还需要删除 /root/path/to/data/partition_col=value 中的_SUCCESS文件,否则它的存在将中断自动分区发现。(我强烈建议使用 1.6.2 或更高版本。

您可以从我的 Spark Summit 关于防弹作业的演讲中获得有关如何管理大型分区表的更多详细信息。

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
data.toDF().write.mode("overwrite").format("parquet").partitionBy("date", "name").save("s3://path/to/somewhere")

这适用于 AWS Glue ETL 作业(Glue 1.0 - Spark 2.4 - Python 2(

在 insertInto 语句中添加 'overwrite=True' 参数可以解决此问题:

hiveContext.setConf("hive.exec.dynamic.partition", "true")
hiveContext.setConf("hive.exec.dynamic.partition.mode", "nonstrict")
df.write.mode("overwrite").insertInto("database_name.partioned_table", overwrite=True)

默认情况下overwrite=False .将其更改为True允许我们覆盖df和partioned_table中包含的特定分区。这有助于我们避免用df覆盖partioned_table的全部内容。

使用 Spark 1.6...

HiveContext可以大大简化此过程。关键是必须先使用定义了分区的 CREATE EXTERNAL TABLE 语句在 Hive 中创建表。例如:

# Hive SQL
CREATE EXTERNAL TABLE test
(name STRING)
PARTITIONED BY
(age INT)
STORED AS PARQUET
LOCATION 'hdfs:///tmp/tables/test'

从这里开始,假设您有一个数据帧,其中包含特定分区(或多个分区(的新记录。可以使用 HiveContext SQL 语句使用此数据帧执行INSERT OVERWRITE,这将仅覆盖数据帧中包含的分区的表:

# PySpark
hiveContext = HiveContext(sc)
update_dataframe.registerTempTable('update_dataframe')
hiveContext.sql("""INSERT OVERWRITE TABLE test PARTITION (age)
                   SELECT name, age
                   FROM update_dataframe""")

注意:此示例中的update_dataframe具有与目标test表的架构匹配的架构。

使用此方法容易犯的一个错误是跳过 Hive 中的CREATE EXTERNAL TABLE步骤,仅使用数据帧 API 的写入方法创建表。特别是对于基于 Parquet 的表,不会正确定义表以支持 Hive 的INSERT OVERWRITE... PARTITION函数。

希望这有帮助。

在 Spark 2.3.1 上使用 Scala 对此进行了测试。上面的大多数答案都是写入 Hive 表。但是,我想直接写入磁盘,磁盘在此文件夹顶部有一个external hive table

首先是所需的配置

val sparkSession: SparkSession = SparkSession
      .builder
      .enableHiveSupport()
      .config("spark.sql.sources.partitionOverwriteMode", "dynamic") // Required for overwriting ONLY the required partitioned folders, and not the entire root folder
      .appName("spark_write_to_dynamic_partition_folders")

这里的用法:

DataFrame
.write
.format("<required file format>")
.partitionBy("<partitioned column name>")
.mode(SaveMode.Overwrite) // This is required.
.save(s"<path_to_root_folder>")

我尝试了以下方法来覆盖 HIVE 表中的特定分区。

### load Data and check records
    raw_df = spark.table("test.original")
    raw_df.count()
lets say this table is partitioned based on column : **c_birth_year** and we would like to update the partition for year less than 1925

### Check data in few partitions.
    sample = raw_df.filter(col("c_birth_year") <= 1925).select("c_customer_sk", "c_preferred_cust_flag")
    print "Number of records: ", sample.count()
    sample.show()

### Back-up the partitions before deletion
    raw_df.filter(col("c_birth_year") <= 1925).write.saveAsTable("test.original_bkp", mode = "overwrite")

### UDF : To delete particular partition.
    def delete_part(table, part):
        qry = "ALTER TABLE " + table + " DROP IF EXISTS PARTITION (c_birth_year = " + str(part) + ")"
        spark.sql(qry)

### Delete partitions
    part_df = raw_df.filter(col("c_birth_year") <= 1925).select("c_birth_year").distinct()
    part_list = part_df.rdd.map(lambda x : x[0]).collect()
    table = "test.original"
    for p in part_list:
        delete_part(table, p)

### Do the required Changes to the columns in partitions
    df = spark.table("test.original_bkp")
    newdf = df.withColumn("c_preferred_cust_flag", lit("Y"))
    newdf.select("c_customer_sk", "c_preferred_cust_flag").show()

### Write the Partitions back to Original table
    newdf.write.insertInto("test.original")

### Verify data in Original table
    orginial.filter(col("c_birth_year") <= 1925).select("c_customer_sk", "c_preferred_cust_flag").show()

Hope it helps.
Regards,
Neeraj

正如 jatin 所写的那样,您可以从 hive 和 path 中删除分区,然后附加数据由于我在它上浪费了太多时间,我为其他 Spark 用户添加了以下示例。我将 Scala 与 Spark 2.2.1 一起使用

  import org.apache.hadoop.conf.Configuration
  import org.apache.hadoop.fs.Path
  import org.apache.spark.SparkConf
  import org.apache.spark.sql.{Column, DataFrame, SaveMode, SparkSession}
  case class DataExample(partition1: Int, partition2: String, someTest: String, id: Int)
 object StackOverflowExample extends App {
//Prepare spark & Data
val sparkConf = new SparkConf()
sparkConf.setMaster(s"local[2]")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
val tableName = "my_table"
val partitions1 = List(1, 2)
val partitions2 = List("e1", "e2")
val partitionColumns = List("partition1", "partition2")
val myTablePath = "/tmp/some_example"
val someText = List("text1", "text2")
val ids = (0 until 5).toList
val listData = partitions1.flatMap(p1 => {
  partitions2.flatMap(p2 => {
    someText.flatMap(
      text => {
        ids.map(
          id => DataExample(p1, p2, text, id)
        )
      }
    )
  }
  )
})
val asDataFrame = spark.createDataFrame(listData)
//Delete path function
def deletePath(path: String, recursive: Boolean): Unit = {
  val p = new Path(path)
  val fs = p.getFileSystem(new Configuration())
  fs.delete(p, recursive)
}
def tableOverwrite(df: DataFrame, partitions: List[String], path: String): Unit = {
  if (spark.catalog.tableExists(tableName)) {
    //clean partitions
    val asColumns = partitions.map(c => new Column(c))
    val relevantPartitions = df.select(asColumns: _*).distinct().collect()
    val partitionToRemove = relevantPartitions.map(row => {
      val fields = row.schema.fields
      s"ALTER TABLE ${tableName} DROP IF EXISTS PARTITION " +
        s"${fields.map(field => s"${field.name}='${row.getAs(field.name)}'").mkString("(", ",", ")")} PURGE"
    })
    val cleanFolders = relevantPartitions.map(partition => {
      val fields = partition.schema.fields
      path + fields.map(f => s"${f.name}=${partition.getAs(f.name)}").mkString("/")
    })
    println(s"Going to clean ${partitionToRemove.size} partitions")
    partitionToRemove.foreach(partition => spark.sqlContext.sql(partition))
    cleanFolders.foreach(partition => deletePath(partition, true))
  }
  asDataFrame.write
    .options(Map("path" -> myTablePath))
    .mode(SaveMode.Append)
    .partitionBy(partitionColumns: _*)
    .saveAsTable(tableName)
}
//Now test
tableOverwrite(asDataFrame, partitionColumns, tableName)
spark.sqlContext.sql(s"select * from $tableName").show(1000)
tableOverwrite(asDataFrame, partitionColumns, tableName)
import spark.implicits._
val asLocalSet = spark.sqlContext.sql(s"select * from $tableName").as[DataExample].collect().toSet
if (asLocalSet == listData.toSet) {
  println("Overwrite is working !!!")
}

}

如果使用数据帧,则可能希望对数据使用 Hive 表。在这种情况下,您只需要调用方法

df.write.mode(SaveMode.Overwrite).partitionBy("partition_col").insertInto(table_name)

它将覆盖数据帧包含的分区。

没有必要指定格式(orc(,因为Spark将使用Hive表格式。

它在Spark版本1.6中工作正常

与其直接写入目标表,我建议您创建一个类似于目标表的临时表并在那里插入数据。

CREATE TABLE tmpTbl LIKE trgtTbl LOCATION '<tmpLocation';

创建表后,将数据写入tmpLocation

df.write.mode("overwrite").partitionBy("p_col").orc(tmpLocation)

然后,您将通过执行以下命令来恢复表分区路径:

MSCK REPAIR TABLE tmpTbl;

通过查询 Hive 元数据获取分区路径,如下所示:

SHOW PARTITONS tmpTbl;

trgtTbl中删除这些分区,并将目录从tmpTbl移动到trgtTbl

我建议您进行清理,然后使用Append模式编写新分区:

import scala.sys.process._
def deletePath(path: String): Unit = {
    s"hdfs dfs -rm -r -skipTrash $path".!
}
df.select(partitionColumn).distinct.collect().foreach(p => {
    val partition = p.getAs[String](partitionColumn)
    deletePath(s"$path/$partitionColumn=$partition")
})
df.write.partitionBy(partitionColumn).mode(SaveMode.Append).orc(path)

这将仅删除新分区。写入数据后,如果需要更新元存储,请运行以下命令:

sparkSession.sql(s"MSCK REPAIR TABLE $db.$table")

注意:deletePath假定hfds命令在您的系统上可用。

我的解决方案意味着从 Spark 数据帧开始覆盖每个特定分区。它跳过删除分区部分。我正在使用 pyspark>=3,并且正在 AWS s3 上编写:

def write_df_on_s3(df, s3_path, field, mode):
    # get the list of unique field values
    list_partitions = [x.asDict()[field] for x in df.select(field).distinct().collect()]
    df_repartitioned = df.repartition(1,field)
    for p in list_partitions:
        # create dataframes by partition and send it to s3
        df_to_send = df_repartitioned.where("{}='{}'".format(field,p))
        df_to_send.write.mode(mode).parquet(s3_path+"/"+field+"={}/".format(p))

这个简单函数的参数是 df、s3_path、分区字段和模式(覆盖或追加(。第一部分获取唯一的字段值:这意味着如果我按天对 df 进行分区,我会得到 df 中所有样片的列表。然后我正在重新分区 df。最后,我每天选择重新分区的 df,并将其写入其特定的分区路径。

您可以根据需要更改重新分区整数。

你可以做这样的事情来使作业可重入(幂等(:(在Spark 2.2上尝试过这个(

# drop the partition
drop_query = "ALTER TABLE table_name DROP IF EXISTS PARTITION (partition_col='{val}')".format(val=target_partition)
print drop_query
spark.sql(drop_query)
# delete directory
dbutils.fs.rm(<partition_directoy>,recurse=True)
# Load the partition
df.write
  .partitionBy("partition_col")
  .saveAsTable(table_name, format = "parquet", mode = "append", path = <path to parquet>)

对于>= Spark 2.3.0 :

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")
data.write.insertInto("partitioned_table", overwrite=True)

相关内容

  • 没有找到相关文章

最新更新