读取csv文件的有效方法是什么?该文件中的值包含apache spark中的分隔符本身?
以下是我的数据集:
ID,Name,Age,Add,ress,Salary
1,Ross,32,Ah,med,abad,2000
2,Rachel,25,Delhi,1500
3,Chandler,23,Kota,2000
4,Monika,25,Mumbai,6500
5,Mike,27,Bhopal,8500
6,Phoebe,22,MP,4500
7,Joey,24,Indore,10000
需要清理数据,因为当文本分隔符不可预测时,无法系统地生成数据帧。
一种方法是移动最后一列,并用引号括起原始地址数据:
val rdd = sc.textFile("file.csv")
//move last column
val rdd2 = rdd.map(s => s.substring(s.lastIndexOf(",")+1)
+ "," + s.substring(0, s.lastIndexOf(",")))
//enclose last column in " and make a DS
val stringDataset = rdd2.map(s => s.replaceAll("(.*?,.*?,.*?,.*?,|.$)", "$1"")).toDS()
//create data frame:
val df = spark.read.option("header","true").csv(stringDataset)
df.show()
输出:
+------+---+--------+---+-----------+
|Salary| ID| Name|Age| Add,ress|
+------+---+--------+---+-----------+
| 2000| 1| Ross| 32|Ah,med,abad|
| 1500| 2| Rachel| 25| Delhi|
| 2000| 3|Chandler| 23| Kota|
| 6500| 4| Monika| 25| Mumbai|
| 8500| 5| Mike| 27| Bhopal|
| 4500| 6| Phoebe| 22| MP|
| 10000| 7| Joey| 24| Indore|
+------+---+--------+---+-----------+
{
// 1. read csv:
val df1 = spark.read.option("header", "true").csv(fileFullName)
df1.show(false)
// when you have format:
// ID,Name,Age,Add,ress,Salary
// 1,Ross,32,Ah,"med,abad",2000
// 2,Rachel,25,Delhi,,1500
// 3,Chandler,23,Kota,,2000
// 4,Monika,25,Mumbai,,6500
// 5,Mike,27,Bhopal,,8500
// 6,Phoebe,22,MP,,4500
// 7,Joey,24,Indore,,10000
// 3. result
// +---+--------+---+------+--------+------+
// |ID |Name |Age|Add |ress |Salary|
// +---+--------+---+------+--------+------+
// |1 |Ross |32 |Ah |med,abad|2000 |
// |2 |Rachel |25 |Delhi |null |1500 |
// |3 |Chandler|23 |Kota |null |2000 |
// |4 |Monika |25 |Mumbai|null |6500 |
// |5 |Mike |27 |Bhopal|null |8500 |
// |6 |Phoebe |22 |MP |null |4500 |
// |7 |Joey |24 |Indore|null |10000 |
// +---+--------+---+------+--------+------+
}