读取Apache spark中包含分隔符的值的CSV文件



读取csv文件的有效方法是什么?该文件中的值包含apache spark中的分隔符本身?

以下是我的数据集

ID,Name,Age,Add,ress,Salary
1,Ross,32,Ah,med,abad,2000
2,Rachel,25,Delhi,1500
3,Chandler,23,Kota,2000
4,Monika,25,Mumbai,6500
5,Mike,27,Bhopal,8500
6,Phoebe,22,MP,4500
7,Joey,24,Indore,10000

需要清理数据,因为当文本分隔符不可预测时,无法系统地生成数据帧。

一种方法是移动最后一列,并用引号括起原始地址数据:

val rdd = sc.textFile("file.csv")
//move last column
val rdd2 = rdd.map(s => s.substring(s.lastIndexOf(",")+1) 
+ "," + s.substring(0, s.lastIndexOf(",")))
//enclose last column in " and make a DS
val stringDataset = rdd2.map(s => s.replaceAll("(.*?,.*?,.*?,.*?,|.$)", "$1"")).toDS()
//create data frame:
val df = spark.read.option("header","true").csv(stringDataset)

df.show()输出:

+------+---+--------+---+-----------+
|Salary| ID|    Name|Age|   Add,ress|
+------+---+--------+---+-----------+
|  2000|  1|    Ross| 32|Ah,med,abad|
|  1500|  2|  Rachel| 25|      Delhi|
|  2000|  3|Chandler| 23|       Kota|
|  6500|  4|  Monika| 25|     Mumbai|
|  8500|  5|    Mike| 27|     Bhopal|
|  4500|  6|  Phoebe| 22|         MP|
| 10000|  7|    Joey| 24|     Indore|
+------+---+--------+---+-----------+

{

//  1. read csv:
val df1 = spark.read.option("header", "true").csv(fileFullName)
df1.show(false)
// when you have format: 
//  ID,Name,Age,Add,ress,Salary
//  1,Ross,32,Ah,"med,abad",2000
//  2,Rachel,25,Delhi,,1500
//  3,Chandler,23,Kota,,2000
//  4,Monika,25,Mumbai,,6500
//  5,Mike,27,Bhopal,,8500
//  6,Phoebe,22,MP,,4500
//  7,Joey,24,Indore,,10000
//  3. result 

//    +---+--------+---+------+--------+------+
//    |ID |Name    |Age|Add   |ress    |Salary|
//    +---+--------+---+------+--------+------+
//    |1  |Ross    |32 |Ah    |med,abad|2000  |
//    |2  |Rachel  |25 |Delhi |null    |1500  |
//    |3  |Chandler|23 |Kota  |null    |2000  |
//    |4  |Monika  |25 |Mumbai|null    |6500  |
//    |5  |Mike    |27 |Bhopal|null    |8500  |
//    |6  |Phoebe  |22 |MP    |null    |4500  |
//    |7  |Joey    |24 |Indore|null    |10000 |
//    +---+--------+---+------+--------+------+

}

最新更新