我已经看到了插入Hive表的方法,比如insertInto(table_name, overwrite =True
,但是我不知道如何处理下面的场景。
对于第一次运行,这样的数据框需要保存在一个表中,按'date_key'进行分区。可以有一个或多个分区,例如202201
和202203
+---+----------+
| id| date_key|
+---+----------+
| 1|202201 |
| 2|202203 |
| 3|202201 |
+---+----------+
对于随后的运行,数据也是这样进来的,我想使用date_key
+---+----------+
| id| date_key|
+---+----------+
| 4|202204 |
| 5|202203 |
| 6|202204 |
+---+----------+
你能不能帮我解释一下如何处理
- 如果在每次运行期间只有一个分区的新数据
- 如果在每次运行期间会有来自多个分区的新数据,就像上面的示例输入一样?
非常感谢你的帮助。如果我能更好地解释这个问题,请告诉我。
编辑:我不能使用df.write.partitionBy("date_key").insertInto(table_name)
,因为有一个错误说insertInto
不能与partitionBy
一起使用。
在这里的示例中,第一次运行将创建新的分区表data
。c2
为分区列
df1 = spark.createDataFrame([
(1, 'a'),
(2, 'b'),
], 'c1 int, c2 string')
df1.show()
df1.write.partitionBy('c2').mode('overwrite').saveAsTable('data')
/
c2=a
part-00000-7810a4aa-a5a1-4c4f-a09a-ef86a66041c9.c000.snappy.parquet
c2=b
part-00000-7810a4aa-a5a1-4c4f-a09a-ef86a66041c9.c000.snappy.parquet
第二次运行,你没有只要append
和insertInto
就行了。Spark知道你的c2
是分区列,你不需要通过partitionBy
告诉它,
df2 = spark.createDataFrame([
(1, 'a'),
(3, 'c'),
], 'c1 int, c2 string')
df2.show()
df2.write.mode('append').insertInto('data')
/
c2=a
part-00000-7810a4aa-a5a1-4c4f-a09a-ef86a66041c9.c000.snappy.parquet
part-00000-dcd9029e-8c65-4397-bca5-ab2691ece7ff.c000.snappy.parquet
c2=b
part-00000-7810a4aa-a5a1-4c4f-a09a-ef86a66041c9.c000.snappy.parquet
c2=c
part-00000-dcd9029e-8c65-4397-bca5-ab2691ece7ff.c000.snappy.parquet
如果该表是外部表,则可以使用以下代码将数据写入外部分区表
df.write.partitionBy("date_key").mode("append").option("path","/path/to/external/table/on/hdfs").saveAsTable("table_name_here")
如果它是一个hive管理表,那么你可以简单地使用saveAsTable
API如下
df.write.partitionBy("date_key").mode("append").saveAsTable("tableName")