如何在pyspark数据帧读取方法中包括分区列



我正在根据镶木地板文件编写Avro文件。我已经阅读了如下文件:

正在读取数据

dfParquet = spark.read.format("parquet").option("mode", "FAILFAST")
.load("/Users/rashmik/flight-time.parquet")

正在写入数据

我已经写了Avro格式的文件如下:

dfParquetRePartitioned.write 
.format("avro") 
.mode("overwrite") 
.option("path", "datasink/avro") 
.partitionBy("OP_CARRIER") 
.option("maxRecordsPerFile", 100000) 
.save()

不出所料,我得到了由OP_CARRIER分区的数据。

从特定分区读取Avro分区的数据

在另一个作业中,我需要从上述作业的输出中读取数据,即从datasink/avro目录中读取数据。我正在使用以下代码从datasink/avro中读取

dfAvro = spark.read.format("avro") 
.option("mode","FAILFAST") 
.load("datasink/avro/OP_CARRIER=AA")

它成功读取数据,但正如预期的那样,OP_CARRIER列在dfAvro数据帧中不可用,因为它是第一个作业的分区列。现在我的要求是将OP_CARRIER字段也包括在第二个数据帧中,即dfAvro中。有人能帮我吗?

我引用的是spark文档中的文档,但我找不到相关信息。任何指针都将非常有用。

使用不同的别名复制相同的列值。

dfParquetRePartitioned.withColumn("OP_CARRIER_1", lit(df.OP_CARRIER)) 
.write 
.format("avro") 
.mode("overwrite") 
.option("path", "datasink/avro") 
.partitionBy("OP_CARRIER") 
.option("maxRecordsPerFile", 100000) 
.save()

这会给你想要的。但用了一个不同的别名。或者你也可以在阅读时这样做。如果位置是动态的,那么您可以很容易地附加列。

path = "datasink/avro/OP_CARRIER=AA"
newcol = path.split("/")[-1].split("=") 
dfAvro = spark.read.format("avro") 
.option("mode","FAILFAST") 
.load(path).withColumn(newcol[0], lit(newcol[1]))

如果值是静态的,那么在数据读取过程中添加它会更容易。

最新更新