我有一个较大的pyspark数据框架。它具有多个类型字符串的列。我将这些列投入到预期的数据类型上。除了代表两个字符代码的公司内部集合的列外,一切似乎都很直截了当。我需要验证列(在集合中),并在无效的情况下将其替换为空。这是我到目前为止所做的:
myDfTyped = myDf.select( myDf.EmployeeKey
, myDf["Amount"].cast("Decimal(10,4)")
, myDf["CountOfDays"].cast("Integer")
, myDf.select('specialCode')
)
特殊代码列表示一组可能的两个字符代码('ab','cd','ef','gh')我需要验证该列包含其中一个代码或在该列中放置一个空。用零替换值匹配如果演员不起作用的预期。
远离用户定义的功能并使用 isin
:
df.where(df["specialCode"].isin(['ab', 'cd', 'ef', 'gh']))
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf
#sample data
specialCode = ['ab', 'cd', 'ef', 'gh']
df = sc.parallelize([
['ab', 10, 1],
['cd', 20, 2],
['ce', 30, 3],
['ef', 0, 4]
]).toDF(['EmployeeKey','Amount','CountOfDays'])
def intersect(val):
return val if val in specialCode else None
intersectUDF = udf(intersect, StringType())
df = df.withColumn("EmployeeKey_converted", intersectUDF(df.EmployeeKey)).drop("EmployeeKey")
您可以在Spark
中创建UDFfrom pyspark.sql.functions import col
from pyspark.sql.functions import udf
def exist(code):
if code in codes:
return code
else:
return null
codes = ['ab', 'cd', 'ef', 'gh']
sqlContext.udf.register("check_code_exists", exist)
data = [{'amount':100,"code":'ab'},{'amount':500,"code":'vb'}]
df = sqlContext.createDataFrame(data)
exist_udf = udf(exist)
df.select('code',exist_udf("code")).show()
输出:
+----+-----------+
|code|exist(code)|
+----+-----------+
| ab| ab|
| vb| null|
+----+-----------+