如果所有值都是ASCII,我需要在pyspark数据帧中进行检查,我会使用以下操作:
def is_ascii(s):
if s:
return all(ord(c) < 128 for c in s)
else:
return None
is_ascii_udf = udf(lambda l: is_ascii(l), BooleanType() )
df_result = df.select( *map(lambda col: is_ascii_udf(df[col]).alias(col), df.columns ) )
我正试图将其用于一个具有50MM行和9000列的新数据,但我得到了以下错误:
ExecutorLostFailure (executor 30 exited caused by one of the running tasks) Reason: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues.
内存似乎已经满了,我无法获得更大的集群,所以我想做以下
import pyspark.sql.functions as F
import pandas as pd
from pyspark.sql.types import *
df = spark.read.parquet( path)
for i in df.columns:
df = spark.read.parquet( path)
df_result = df.select( *map(lambda col: is_ascii_udf(df[col]).alias(col), [i] ) )
n = df_result.filter( ~F.col(i) ).count()
if n>0:
print(i,n)
但是我得到了同样的错误,为什么每次我读取数据帧并只对一列执行udf时,我仍然得到同样的错误
集群有50 GB内存,6个核心,最多8个工作
我认为错误在于功能,或者我如何使用它
问候
对于集群来说,即使在一列上运行它也可能太多。无论如何,有一些Spark SQL方法可用于执行您想要的操作,这些方法在性能和内存方面都应该更高效。
下面的代码将在每列中给出非ascii字符的布尔值或计数,并将结果收集到列表中。
df.createOrReplaceTempView('df')
is_not_ascii = [[col, spark.sql('select max(array_max(transform(split(%s, ""), x -> ascii(x))) >= 128) as is_ascii from df' % col).collect()[0][0]] for col in df.columns]
# e.g. [['key', False], ['val', False]]
count_not_ascii = [[col, spark.sql('select sum(cast(array_max(transform(split(%s, ""), x -> ascii(x))) >= 128 as int)) as is_ascii from df' % col).collect()[0][0]] for col in df.columns]
# e.g. [['key', 0], ['val', 0]]