删除spark csv数据帧中的列



我有一个数据帧,我对它的所有字段进行串联。

串联后,它变成了另一个数据帧,最后我将其输出写入csv文件,并在其两列上进行分区。它的一列出现在第一个数据帧中,我不想将其包含在最终输出中。

这是我的代码:

val dfMainOutput = df1resultFinal.join(latestForEachKey, Seq("LineItem_organizationId", "LineItem_lineItemId"), "outer")
.select($"LineItem_organizationId", $"LineItem_lineItemId",
when($"DataPartition_1".isNotNull, $"DataPartition_1").otherwise($"DataPartition".cast(DataTypes.StringType)).as("DataPartition"),
when($"StatementTypeCode_1".isNotNull, $"StatementTypeCode_1").otherwise($"StatementTypeCode").as("StatementTypeCode"),
when($"FFAction_1".isNotNull, concat(col("FFAction_1"), lit("|!|"))).otherwise(concat(col("FFAction"), lit("|!|"))).as("FFAction"))
.filter(!$"FFAction".contains("D"))

在这里,我正在连接并创建另一个数据帧:

val dfMainOutputFinal = dfMainOutput.select($"DataPartition", $"StatementTypeCode",concat_ws("|^|", dfMainOutput.schema.fieldNames.map(c => col(c)): _*).as("concatenated"))     

这就是我尝试过的

dfMainOutputFinal
.drop("DataPartition")
.write
.partitionBy("DataPartition","StatementTypeCode")
.format("csv")
.option("header","true")
.option("encoding", "ufeff")
.option("codec", "gzip")
.save("path to csv")

现在我不希望DataPartition列出现在我的输出中。

我正在基于DataPartition进行分区,所以我没有得到,但因为DataPartition存在于主数据帧中,所以我在输出中得到了它。

问题1:如何忽略Dataframe 中的列

问题2:在写入实际数据之前,是否有任何方法可以在csv输出文件中添加"ufeff",以便我的编码格式变为UTF-8-BOM。

根据建议答案

这就是我尝试过的

val dfMainOutputFinal = dfMainOutput.select($"DataPartition", $"StatementTypeCode",concat_ws("|^|", dfMainOutput.schema.filter(_ != "DataPartition").fieldNames.map(c => col(c)): _*).as("concatenated"))

但是低于错误

<console>:238: error: value fieldNames is not a member of Seq[org.apache.spark.sql.types.StructField]
val dfMainOutputFinal = dfMainOutput.select($"DataPartition", $"StatementTypeCode",concat_ws("|^|", dfMainOutput.schema.filter(_ != "DataPartition").fieldNames.map(c => col(c)): _*).as("concatenated"))

以下是如果我必须在最终输出中删除两列的问题

val dfMainOutputFinal = dfMainOutput.select($"DataPartition","PartitionYear",concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition","PartitionYear").map(c => col(c)): _*).as("concatenated"))

问题1:

您在df.write.partitionBy()中使用的列不会添加到最终的csv文件中。由于数据是在文件结构中编码的,因此会自动忽略它们。但是,如果您的意思是从concat_ws(从而从文件中)中删除它,则可以进行一个小的更改:

concat_ws("|^|", 
dfMainOutput.schema.fieldNames
.filter(_ != "DataPartition")
.map(c => col(c)): _*).as("concatenated"))

在这里,列DataPartition在连接之前被过滤掉。

问题2:

Spark似乎不支持UTF-8 BOM,并且在读取这种格式的文件时似乎会出现问题。除了在Spark完成后编写一个脚本来添加BOM字节之外,我想不出任何简单的方法来将BOM字节添加到每个csv文件中。我的建议是简单地使用普通的UTF-8格式。

dfMainOutputFinal.write.partitionBy("DataPartition","StatementTypeCode")
.format("csv")
.option("header", "true")
.option("encoding", "UTF-8")
.option("codec", "gzip")
.save("path to csv")

此外,根据Unicode标准,不建议使用BOM。

。。。UTF-8不需要也不建议使用BOM,但在UTF-8数据从使用BOM的其他编码形式转换的情况下,或者在将BOM用作UTF-8签名的情况下可能会遇到这种情况。

问题1:如何忽略Dataframe 中的列

答案:

val df = sc.parallelize(List(Person(1,2,3), Person(4,5,6))).toDF("age", "height", "weight")
df.columns
df.show()

+---+------+------+
|age|height|weight|
+---+------+------+
|  1|     2|     3|
|  4|     5|     6|
+---+------+------+

val df_new=df.select("age", "height")
df_new.columns
df_new.show()
+---+------+
|age|height|
+---+------+
|  1|     2|
|  4|     5|
+---+------+
df: org.apache.spark.sql.DataFrame = [age: int, height: int ... 1 more field]
df_new: org.apache.spark.sql.DataFrame = [age: int, height: int]

问题2:有没有办法在csv输出文件中添加"\ueff"在写入实际数据之前,使我的编码格式UTF-8-BOM。

答案:

String path= "/data/vaquarkhan/input/unicode.csv";
String outputPath = "file:/data/vaquarkhan/output/output.csv";
getSparkSession()
.read()
.option("inferSchema", "true")
.option("header", "true")
.option("encoding", "UTF-8")
.csv(path)
.write()
.mode(SaveMode.Overwrite)
.csv(outputPath);
}

相关内容

  • 没有找到相关文章

最新更新