我正在尝试读取spark中的多行csv文件。我的模式是:Id,name和mark。我的输入和实际输出如下。我没有得到预期的产出。有人能帮我解决代码中缺少的问题吗。
代码:
val myMarkDF = spark
.read
.format("csv")
.option("path","mypath\marks.csv")
.option("inferSchema","true")
.option("multiLine","true")
.option("delimiter",",")
.load
输入:
1,A,
97,,
1,A,98
1,A,
99,,
2,B,100
2,B,95
实际输出:
+---+----+----+
|_c0| _c1| _c2|
+---+----+----+
| 1| A|null|
| 97|null|null|
| 1| A| 98|
| 1| A|null|
| 99|null|null|
| 2| B| 100|
| 2| B| 95|
+---+----+----+
预期输出:
+---+----+----+
|_c0| _c1| _c2|
+---+----+----+
| 1| A| 97|
| 1| A| 98|
| 1| A| 99|
| 2| B| 100|
| 2| B| 95|
+---+----+----+
谢谢!
EDIT:一个更好的解决方案,可以处理更多类型的断开记录(在第2列或第3列断开(。重要的部分是计算非空条目的总和,将本应在同一记录中的行分组在一起。
val df = spark.read.csv("file.csv")
df.show
+---+----+----+
|_c0| _c1| _c2|
+---+----+----+
| 1| A|null|
| 97|null|null|
| 1| A| 98|
| 1|null|null| <-- note that I intentionally changed these two rows
| A| 99|null| <-- to demonstrate how to handle two types of broken records
| 2| B| 100|
| 2| B| 95|
+---+----+----+
val df2 = df.withColumn(
"id", monotonically_increasing_id()
).withColumn(
"notnulls",
$"_c0".isNotNull.cast("int") + $"_c1".isNotNull.cast("int") + $"_c2".isNotNull.cast("int")
).withColumn(
"notnulls",
ceil(sum($"notnulls").over(Window.orderBy("id")) / 3)
).groupBy("notnulls").agg(
filter(
flatten(collect_list(array("_c0","_c1","_c2"))),
x => x.isNotNull
).alias("array")
).select(
$"array"(0).alias("c0"),
$"array"(1).alias("c1"),
$"array"(2).alias("c2")
)
df2.show
+---+---+---+
| c0| c1| c2|
+---+---+---+
| 1| A| 97|
| 1| A| 98|
| 1| A| 99|
| 2| B|100|
| 2| B| 95|
+---+---+---+
不太好用的旧答案:
不是解析csv的最佳方式,但至少是您用例的MVP:
val df = sc.wholeTextFiles("marks.csv").map(
row => row._2.replace(",,n", "n").replace(",n", ",").split("n")
).toDF(
"value"
).select(
explode($"value")
).select(
split($"col", ",").as("col")
).select(
$"col"(0), $"col"(1), $"col"(2)
)
df.show
+------+------+------+
|col[0]|col[1]|col[2]|
+------+------+------+
| 1| A| 97|
| 1| A| 98|
| 1| A| 99|
| 2| B| 100|
| 2| B| 95|
+------+------+------+