我正在尝试验证csv文件(每个记录的列数(。根据下面的链接,在Databricks 3.0中有处理它的选项。
http://www.discussbigdata.com/2018/07/capture-bad-records-while-loading-csv.html
df = spark.read
.option("badRecordsPath", "/data/badRecPath")
.parquet("/input/parquetFile")
然而,我使用的是2.3火花版本,无法使用该选项。
在作为pyspark的一部分进行读取时,是否有任何方法可以找出csv文件中的坏记录,并希望将坏记录写入文件。
模式不是静态的,因为我们正在处理多个表的数据,并且不能对模式进行硬编码。
df = spark.read.option("wholeFile", "true").
option("header", "true").
option("quote", """).
csv("${table}/path/to/csv/file")
我不确定您将哪种记录称为坏记录,因为我们看不到您的输入数据。基于我的假设,假设我们有一个下面的输入文件,它有五列。
col1,col2,col3,col4,col5
1,ABC,YYY,101,USA
2,ABC,ZZZ,102,USA
3,ABC,,,USA
4,ABC,GGG,104,USA
5,ABC,PPP,105
第3行具有较少的空列,第5行具有较少列。所以我不想在我的数据帧中加载这两条记录。
PATH_TO_FILE = "file:///user/vikrant/hivespark/userinput"
df = sc.textFile(PATH_TO_FILE)
.mapPartitions(lambda line: csv.reader(line,delimiter=',', quotechar='"'))
.map(lambda x: [i for i in x if len(i)!= 0])
.filter(lambda line: len(line) > 4 and line[0] != 'col1')
.toDF(['Col1','Col2','Col3','Col4','Col5'])
>>> df.show();
+----+----+----+----+----+
|Col1|Col2|Col3|Col4|Col5|
+----+----+----+----+----+
| 1| ABC| YYY| 101| USA|
| 2| ABC| ZZZ| 102| USA|
| 4| ABC| GGG| 104| USA|
+----+----+----+----+----+
如果你想从你的输入文件中提取坏记录:
badrecords = sc.textFile(PATH_TO_FILE)
.mapPartitions(lambda line: csv.reader(line,delimiter=',', quotechar='"'))
.map(lambda x: [i for i in x if len(i)!= 0])
.filter(lambda line: len(line) < 5 and line[0] != 'col1')
>>> badrecords.take(10)
[['3', 'ABC', 'USA'], ['5', 'ABC', 'PPP', '105']]
让我知道它是否对你有效或有帮助!