验证CSV文件PySpark



我正在尝试验证csv文件(每个记录的列数(。根据下面的链接,在Databricks 3.0中有处理它的选项。

http://www.discussbigdata.com/2018/07/capture-bad-records-while-loading-csv.html

df = spark.read
.option("badRecordsPath", "/data/badRecPath")
.parquet("/input/parquetFile")

然而,我使用的是2.3火花版本,无法使用该选项。

在作为pyspark的一部分进行读取时,是否有任何方法可以找出csv文件中的坏记录,并希望将坏记录写入文件。

模式不是静态的,因为我们正在处理多个表的数据,并且不能对模式进行硬编码。

df = spark.read.option("wholeFile", "true"). 
option("header", "true"). 
option("quote", """). 
csv("${table}/path/to/csv/file")

我不确定您将哪种记录称为坏记录,因为我们看不到您的输入数据。基于我的假设,假设我们有一个下面的输入文件,它有五列。

col1,col2,col3,col4,col5
1,ABC,YYY,101,USA
2,ABC,ZZZ,102,USA
3,ABC,,,USA
4,ABC,GGG,104,USA
5,ABC,PPP,105

第3行具有较少的空列,第5行具有较少列。所以我不想在我的数据帧中加载这两条记录。

PATH_TO_FILE = "file:///user/vikrant/hivespark/userinput"
df = sc.textFile(PATH_TO_FILE)
.mapPartitions(lambda line: csv.reader(line,delimiter=',', quotechar='"'))
.map(lambda x: [i for i in x if len(i)!= 0]) 
.filter(lambda line: len(line) > 4 and line[0] != 'col1') 
.toDF(['Col1','Col2','Col3','Col4','Col5'])

>>> df.show();
+----+----+----+----+----+
|Col1|Col2|Col3|Col4|Col5|
+----+----+----+----+----+
|   1| ABC| YYY| 101| USA|
|   2| ABC| ZZZ| 102| USA|
|   4| ABC| GGG| 104| USA|
+----+----+----+----+----+

如果你想从你的输入文件中提取坏记录:

badrecords = sc.textFile(PATH_TO_FILE)
.mapPartitions(lambda line: csv.reader(line,delimiter=',', quotechar='"'))
.map(lambda x: [i for i in x if len(i)!= 0]) 
.filter(lambda line: len(line) < 5 and line[0] != 'col1')
>>> badrecords.take(10)
[['3', 'ABC', 'USA'], ['5', 'ABC', 'PPP', '105']]

让我知道它是否对你有效或有帮助!

最新更新