我在hdfs中有一个文件,用逗号(,)
分隔,我正在尝试使用scala提取第6列,我在下面写了代码
object WordCount {
def main(args: Array[String])
{
val textfile = sc.textFile("/user/cloudera/xxx/xxx")
val word = textfile.filter( x => x.length > 0 ).map(_.replaceAll("\|",",").trim)
val keys = word.map(a => a(5))
keys.saveAsTextFile("/user/cloudera/xxx/Sparktest")
}
}
但我在HDFS中得到的结果并不是我想要的。
以前我的数据是:
MSH|^~&|RQ|BIN|SMS|BIN|2019||ORU^R01|120330003918|J|2.2
PID|1|xxxxx|xxxx||TEST|Rooney|19761202|M|MR^^M^MR^MD^11|7|0371 HOES LANE^0371
现在我的数据是:
T
I
,
1
N
T
I
,
1
N
T
I
我希望我的结果是:
BIN
TEST
我不知道我做错了什么。请帮助
您正在用,
替换|
,但您没有用逗号分隔,因此word
仍然具有类型RDD[String]
,而不是您所期望的RDD[Array[String]]
。然后,a => a(5)
将每个字符串视为字符的数组,从而得到您所看到的结果。
不知道为什么你首先要用逗号替换管道,你可以:
val word = textfile.filter(x => x.length > 0).map(_.split('|'))
val keys = word.map(a => a(5).trim)
使用'split()'函数!
val s="MSH|^~\&|RQ|BIN|SMS|BIN|2019||ORU^R01|120330003918|J|2.2"
// WRONG
s.replaceAll("\|",",")(5)
res3: Char = ~
// RIGHT
s.split("\|")(5)
res4: String = BIN
在spark 2.0中,现在您有了csv读取器,因此您可以按照以下简单地加载csv
val baseDS=spark.read.option("header","true").csv(filePath)
baseDS.show()
您可以通过以下简单地选择列的名称
val selectCol = baseDS.select(ColumnName)