从列中提取用连字符分隔的值并应用UDF



我有一个dataframe,如下所示:

+--------+-------+-------+--------------+--------------------+-----------------+----------+--------------------+------------+
|sequence|recType|valCode|registerNumber|                rest|        errorCode|errorType |    errorDescription|isSuccessful|
+--------+-------+-------+--------------+--------------------+-----------------+----------+--------------------+------------+
|       9|     11|      0|      XXXX2288|110XXXX2288MKKKKK...|         CHAR0088|     ERROR|Records out of se...|           N|
|       9|     12|      0|      XXXX2288|130XXXX22880011ZZ...|         CHAR0088|     ERROR|Records out of se...|           N|
|       9|     18|      0|      XXXX2288|140XXXX2288      ...|         CHAR0088|     ERROR|Records out of se...|           N|
+--------+-------+-------+--------------+--------------------+-----------------+----------+--------------------+------------+           N|

下面的代码使用UDF填充errorTypeerrorDescription列的数据。UDFsresolveErrorTypeUDFresolveErrorDescUDF以一个errorCode作为输入,分别输出errorTypeerrorDescription

errorFinalDf = errorDfAll.na.fill("") 
.withColumn("errorType", resolveErrorTypeUDF(col("errorCode"))) 
.withColumn("errorDescription", resolveErrorDescUDF(col("errorCode"))) 
.withColumn("isSuccessful", when(trim(col("errorCode")).eqNullSafe(""), "Y").otherwise("N")) 
.dropDuplicates()

请注意,我过去只在errorCode列中得到一个error code。现在,我将在errorCode列中获得单个/多个-分离的error codes。我需要填充所有的映射errorTypeerrorDescription,并以-分隔将它们写入各自的列。

新的dataframe看起来像这样。

+--------+-------+-------+--------------+--------------------+-----------------+----------+--------------------+------------+
|sequence|recType|valCode|registerNumber|                rest|        errorCode|errorType |    errorDescription|isSuccessful|
+--------+-------+-------+--------------+--------------------+-----------------+----------+--------------------+------------+
|       7|      1|      0|      XXXX8822|010XXXX8822XBCDEF...|CHAR0009-CHAR0021|ERROR-WARN|Short Failed-Miss...|           N|
|       7|     11|      0|      XXXX8822|110XXXX8822LLLLLL...|CHAR0009-CHAR0021|ERROR-WARN|Short Failed-Miss...|           N|
|       7|     12|      0|      XXXX8822|120XXXX8822011GB ...|CHAR0009-CHAR0021|ERROR-WARN|Short Failed-Miss...|           N|
|       7|     18|      0|      XXXX8822|180XXXX8822      ...|CHAR0009-CHAR0021|ERROR-WARN|Short Failed-Miss...|           N|
|       7|     18|      0|      XXXX8822|180XXXX88220     ...|CHAR0009-CHAR0021|ERROR-WARN|Short Failed-Miss...|           N|
+--------+-------+-------+--------------+--------------------+-----------------+----------+--------------------+------------+

需要做哪些更改来适应新的场景。请帮助。谢谢你。

您需要最小的更改,仅限于您的UDFs

假设您有一个简单的python函数,get_type_from_code能够将带有错误代码的字符串转换为相应的类型(同样适用于描述)。

from pyspark.sql import functions as F, types as T
def get_type_from_code(c: str) -> str:
"""Function to convert error code to error type.
Mind the interface: string in, string out
"""
return {'CHAR0009': 'ERROR', 'CHAR0021': 'WARNING'}.get(c, 'UNKNOWN')

@F.udf(returnType=T.StringType())
def convert_errcodes_to_types(codes: str) -> str:
"""Convert a string of error codes separated by '-' into a string of types concatenated with '-'"""
return '-'.join(
map(get_type_from_code, codes.split('-'))
)

完成了!

最新更新