我想根据列是否具有单个无效条目来验证列。我的约束是避免随机播放和多次扫描,以便将其扩展到 PB 级。
我尝试使用普通字符串比较验证列并且有效,但我无法尝试使用正则表达式。我的问题陈述如下:
| Column 1 | Column 2 | Column 3 | Column 4 | Column 5 |
| --------------| --------------| --------------| --------------| --------------|
|(123)-456-7890 | 123-456-7890 |(123)-456-789 | |(123)-456-7890 |
|(123)-456-7890 | 123-4567890 |(123)-456-7890 |(123)-456-7890 | null |
|(123)-456-7890 | 1234567890 |(123)-456-7890 |(123)-456-7890 | null |
有效格式为:
(xxx)-xxx-xxxx, xxx-xxx-xxxx, xxx-xxxxxxx and xxxxxxxxxx
因此,上述输入的 o/p 应该是:
| Column 1 | Column 2 |
| --------------| --------------|
|(123)-456-7890 | 123-456-7890 |
|(123)-456-7890 | 123-4567890 |
|(123)-456-7890 | 1234567890 |
我当前的代码如下:
import regex as re
from pyspark.sql.functions import col, lit
from pyspark.sql.functions import sum as _sum
from pyspark.sql.functions import when
from pyspark.sql import Row
formats = [r'^(?:(d{3})-)d{3}-d{4}$',
r'^(?:d{3}-)d{3}-d{4}$', r'^(?:d{3}-)d{7}$', r'^d{10}$']
def validate_format(number):
length = len(number)
if length == 14:
if (re.match(formats[0], number)):
return True
return False
if length == 12:
if (re.match(formats[1], number)):
return True
return False
if length == 11:
if (re.match(formats[2], number)):
return True
return False
if length == 10:
if (re.match(formats[3], number)):
return True
return False
return False
def create_dataframe(spark):
my_cols = Row("Column1", "Column2", "Column3", "Column4")
row_1 = my_cols('(617)-283-3811', 'Salah', 'Messi', None)
row_2 = my_cols('617-2833811', 'Messi', 'Virgil', 'Messi')
row_3 = my_cols('617-283-3811', 'Ronaldo', 'Messi', 'Ronaldo')
row_seq = [row_1, row_2, row_3]
df = spark.createDataFrame(row_seq)
invalid_counts = invalid_counts_in_df(df)
print(invalid_counts)
def invalid_counts_in_df(df):
invalid_counts = df.select(
*[_sum(when(validate_format(col(c)), lit(0)).otherwise(lit(1))).alias(c) for c in df.columns]).collect()
return invalid_counts
当我像这里一样处理普通字符串时,我成功了。但是,现在我的函数返回一条错误消息:
>>> create_dataframe(spark)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 8, in create_dataframe
File "<stdin>", line 3, in invalid_counts_in_df
File "<stdin>", line 3, in <listcomp>
File "<stdin>", line 2, in validate_format
TypeError: object of type 'Column' has no len()
我没有使用适当的方法来以最有效的方式使列无效或验证列。我知道多次扫描和大量洗牌绝对不是要走的路。
我希望找到一种方法来获取将所有条目作为有效格式的列。
在性能方面,您应该始终尝试使用 pyspark 函数而不是 python 函数。Pyspark 函数经过优化,可利用集群的资源,数据无需转换为 python 对象。
适合您的用例的 pyspark 函数是 rlike。请看下面的示例:
from pyspark.sql import Row
my_cols = Row("Column1", "Column2", "Column3", "Column4")
row_1 = my_cols('(617)-283-3811', 'Salah', 'Messi', None)
row_2 = my_cols('617-2833811', 'Messi', 'Virgil', 'Messi')
row_3 = my_cols('617-283-3811', 'Ronaldo', 'Messi', 'Ronaldo')
row_seq = [row_1, row_2, row_3]
df = spark.createDataFrame(row_seq)
numberOfRows = df.count()
#I have simplified your regexes a bit because I don't see a reason
#why you need non capturing groups
expr = "^((d{3})-d{3}-d{4})|(d{3}-d{3}-d{4})|(d{3}-d{7})|(d{10})$"
#you can also set it to df.columns
columnsToCheck = ['Column1']
columnsToRemove = []
for col in columnsToCheck:
numberOfMatchingRows = df.filter(df[col].rlike(expr)).count()
if numberOfMatchingRows < numberOfRows:
columnsToRemove.append(col)
df = df.select(*[c for c in df.columns if c not in columnsToRemove])
df.show()
输出:
+--------------+-------+-------+-------+
| Column1|Column2|Column3|Column4|
+--------------+-------+-------+-------+
|(617)-283-3811| Salah| Messi| null|
| 617-2833811| Messi| Virgil| Messi|
| 617-283-3811|Ronaldo| Messi|Ronaldo|
+--------------+-------+-------+-------+