使用Spark选择特定列



我在hdfs中有一个文件,用逗号(,)分隔,我正在尝试使用scala提取第6列,我在下面写了代码

object WordCount {
 def main(args: Array[String])
 {
 val textfile = sc.textFile("/user/cloudera/xxx/xxx")
 val word = textfile.filter( x => x.length >  0 ).map(_.replaceAll("\|",",").trim)
 val keys = word.map(a => a(5))
 keys.saveAsTextFile("/user/cloudera/xxx/Sparktest")
 }
}

但我在HDFS中得到的结果并不是我想要的。

以前我的数据是:

MSH|^~&|RQ|BIN|SMS|BIN|2019||ORU^R01|120330003918|J|2.2
PID|1|xxxxx|xxxx||TEST|Rooney|19761202|M|MR^^M^MR^MD^11|7|0371 HOES LANE^0371

现在我的数据是:


T
I
,
1
N

T
I
,
1
N

T
I

我希望我的结果是:

BIN
TEST 

我不知道我做错了什么。请帮助

您正在用,替换|,但您没有用逗号分隔,因此word仍然具有类型RDD[String],而不是您所期望的RDD[Array[String]]。然后,a => a(5)将每个字符串视为字符的数组,从而得到您所看到的结果。

不知道为什么你首先要用逗号替换管道,你可以:

val word = textfile.filter(x => x.length >  0).map(_.split('|'))
val keys = word.map(a => a(5).trim)

使用'split()'函数!

val s="MSH|^~\&|RQ|BIN|SMS|BIN|2019||ORU^R01|120330003918|J|2.2"
// WRONG
s.replaceAll("\|",",")(5)   
res3: Char = ~
// RIGHT
s.split("\|")(5) 
res4: String = BIN

在spark 2.0中,现在您有了csv读取器,因此您可以按照以下简单地加载csv

val baseDS=spark.read.option("header","true").csv(filePath)
  baseDS.show()

您可以通过以下简单地选择列的名称

val selectCol = baseDS.select(ColumnName)

最新更新