我正在使用读取一个具有页眉、内容和页脚的文本(而不是CSV)文件
spark.read.format("text").option("delimiter","|")...load(file)
我可以使用df.first()
访问标头。有接近df.last()
或df.reverse().first()
的东西吗?
样本数据:
col1|col2|col3
100|hello|asdf
300|hi|abc
200|bye|xyz
800|ciao|qwerty
This is the footer line
处理逻辑:
#load text file
txt = sc.textFile("path_to_above_sample_data_text_file.txt")
#remove header
header = txt.first()
txt = txt.filter(lambda line: line != header)
#remove footer
txt = txt.map(lambda line: line.split("|"))
.filter(lambda line: len(line)>1)
#convert to dataframe
df=txt.toDF(header.split("|"))
df.show()
输出为:
+----+-----+------+
|col1| col2| col3|
+----+-----+------+
| 100|hello| asdf|
| 300| hi| abc|
| 200| bye| xyz|
| 800| ciao|qwerty|
+----+-----+------+
假设文件不是那么大,我们可以使用collect来获取作为迭代器的数据帧,并访问最后一个元素,如下所示:
df = df.collect()[data.count()-1]
避免在大型数据集上使用CCD_ 4。
或
我们可以用take截去最后一排。
df = df.take(data.count()-1)
假设您的文本文件具有JSON头和页脚,Spark SQL方式,
样本数据
{"":[{<field_name>:<field_value1>},{<field_name>:<field_value2>}]}
这里可以通过以下3行来避免标题(假设数据中没有Tilda),
jsonToCsvDF=spark.read.format("com.databricks.spark.csv").option("delimiter", "~").load(<Blob Path1/ ADLS Path1>)
jsonToCsvDF.createOrReplaceTempView("json_to_csv")
spark.sql("SELECT SUBSTR(`_c0`,5,length(`_c0`)-5) FROM json_to_csv").coalesce(1).write.option("header",false).mode("overwrite").text(<Blob Path2/ ADLS Path2>)
现在输出看起来像
[{<field_name>:<field_value1>},{<field_name>:<field_value2>}]
除了上面的答案外,对于具有multiple
、header
和footer
行的文件,在solution fits good
下面:-
val data_delimiter = "|"
val skipHeaderLines = 5
val skipHeaderLines = 3
//-- Read file into Dataframe and convert to RDD
val dataframe = spark.read.option("wholeFile", true).option("delimiter",data_delimiter).csv(s"hdfs://$in_data_file")
val rdd = dataframe.rdd
//-- RDD without header and footer
val dfRdd = rdd.zipWithIndex().filter({case (line, index) => index != (cnt - skipFooterLines) && index > (skipHeaderLines - 1)}).map({case (line, index) => line})
//-- Dataframe without header and footer
val df = spark.createDataFrame(dfRdd, dataframe.schema)