如何在没有多次扫描和洗牌的情况下根据 Pyspark 中的正则表达式条件验证(和删除)列?



我想根据列是否具有单个无效条目来验证列。我的约束是避免随机播放和多次扫描,以便将其扩展到 PB 级。

我尝试使用普通字符串比较验证列并且有效,但我无法尝试使用正则表达式。我的问题陈述如下:


| Column 1      | Column 2      | Column 3      | Column 4      | Column 5      |
| --------------| --------------| --------------| --------------| --------------|
|(123)-456-7890 | 123-456-7890  |(123)-456-789  |               |(123)-456-7890 |
|(123)-456-7890 | 123-4567890   |(123)-456-7890 |(123)-456-7890 | null          |
|(123)-456-7890 | 1234567890    |(123)-456-7890 |(123)-456-7890 | null          |

有效格式为:

(xxx)-xxx-xxxx, xxx-xxx-xxxx, xxx-xxxxxxx and xxxxxxxxxx

因此,上述输入的 o/p 应该是:

| Column 1      | Column 2      |
| --------------| --------------| 
|(123)-456-7890 | 123-456-7890  |
|(123)-456-7890 | 123-4567890   |
|(123)-456-7890 | 1234567890    |

我当前的代码如下:

import regex as re
from pyspark.sql.functions import col, lit
from pyspark.sql.functions import sum as _sum
from pyspark.sql.functions import when
from pyspark.sql import Row
formats = [r'^(?:(d{3})-)d{3}-d{4}$',
r'^(?:d{3}-)d{3}-d{4}$', r'^(?:d{3}-)d{7}$', r'^d{10}$']

def validate_format(number):
length = len(number)
if length == 14:
if (re.match(formats[0], number)):
return True
return False
if length == 12:
if (re.match(formats[1], number)):
return True
return False
if length == 11:
if (re.match(formats[2], number)):
return True
return False
if length == 10:
if (re.match(formats[3], number)):
return True
return False
return False

def create_dataframe(spark):
my_cols = Row("Column1", "Column2", "Column3", "Column4")
row_1 = my_cols('(617)-283-3811', 'Salah', 'Messi', None)
row_2 = my_cols('617-2833811', 'Messi', 'Virgil', 'Messi')
row_3 = my_cols('617-283-3811', 'Ronaldo', 'Messi', 'Ronaldo')
row_seq = [row_1, row_2, row_3]
df = spark.createDataFrame(row_seq)
invalid_counts = invalid_counts_in_df(df)
print(invalid_counts)

def invalid_counts_in_df(df):
invalid_counts = df.select(
*[_sum(when(validate_format(col(c)), lit(0)).otherwise(lit(1))).alias(c) for c in df.columns]).collect()
return invalid_counts

当我像这里一样处理普通字符串时,我成功了。但是,现在我的函数返回一条错误消息:

>>> create_dataframe(spark)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 8, in create_dataframe
File "<stdin>", line 3, in invalid_counts_in_df
File "<stdin>", line 3, in <listcomp>
File "<stdin>", line 2, in validate_format
TypeError: object of type 'Column' has no len()

我没有使用适当的方法来以最有效的方式使列无效或验证列。我知道多次扫描和大量洗牌绝对不是要走的路。

我希望找到一种方法来获取将所有条目作为有效格式的列。

在性能方面,您应该始终尝试使用 pyspark 函数而不是 python 函数。Pyspark 函数经过优化,可利用集群的资源,数据无需转换为 python 对象。

适合您的用例的 pyspark 函数是 rlike。请看下面的示例:

from pyspark.sql import Row
my_cols = Row("Column1", "Column2", "Column3", "Column4")
row_1 = my_cols('(617)-283-3811', 'Salah', 'Messi', None)
row_2 = my_cols('617-2833811', 'Messi', 'Virgil', 'Messi')
row_3 = my_cols('617-283-3811', 'Ronaldo', 'Messi', 'Ronaldo')
row_seq = [row_1, row_2, row_3]
df = spark.createDataFrame(row_seq)
numberOfRows = df.count()
#I have simplified your regexes a bit because I don't see a reason 
#why you need non capturing groups 
expr = "^((d{3})-d{3}-d{4})|(d{3}-d{3}-d{4})|(d{3}-d{7})|(d{10})$"
#you can also set it to df.columns
columnsToCheck = ['Column1']
columnsToRemove = []
for col in columnsToCheck:
numberOfMatchingRows = df.filter(df[col].rlike(expr)).count()
if numberOfMatchingRows < numberOfRows:
columnsToRemove.append(col)
df = df.select(*[c for c in df.columns if c not in columnsToRemove])
df.show()

输出:

+--------------+-------+-------+-------+
|       Column1|Column2|Column3|Column4|
+--------------+-------+-------+-------+ 
|(617)-283-3811|  Salah|  Messi|   null| 
|   617-2833811|  Messi| Virgil|  Messi| 
|  617-283-3811|Ronaldo|  Messi|Ronaldo| 
+--------------+-------+-------+-------+

最新更新