我有一个dataframe
,如下所示:
+--------+-------+-------+--------------+--------------------+-----------------+----------+--------------------+------------+
|sequence|recType|valCode|registerNumber| rest| errorCode|errorType | errorDescription|isSuccessful|
+--------+-------+-------+--------------+--------------------+-----------------+----------+--------------------+------------+
| 9| 11| 0| XXXX2288|110XXXX2288MKKKKK...| CHAR0088| ERROR|Records out of se...| N|
| 9| 12| 0| XXXX2288|130XXXX22880011ZZ...| CHAR0088| ERROR|Records out of se...| N|
| 9| 18| 0| XXXX2288|140XXXX2288 ...| CHAR0088| ERROR|Records out of se...| N|
+--------+-------+-------+--------------+--------------------+-----------------+----------+--------------------+------------+ N|
下面的代码使用UDF
填充errorType
和errorDescription
列的数据。UDFs
即resolveErrorTypeUDF
和resolveErrorDescUDF
以一个errorCode
作为输入,分别输出errorType
和errorDescription
。
errorFinalDf = errorDfAll.na.fill("")
.withColumn("errorType", resolveErrorTypeUDF(col("errorCode")))
.withColumn("errorDescription", resolveErrorDescUDF(col("errorCode")))
.withColumn("isSuccessful", when(trim(col("errorCode")).eqNullSafe(""), "Y").otherwise("N"))
.dropDuplicates()
请注意,我过去只在errorCode
列中得到一个error code
。现在,我将在errorCode
列中获得单个/多个-
分离的error codes
。我需要填充所有的映射errorType
和errorDescription
,并以-
分隔将它们写入各自的列。
新的dataframe
看起来像这样。
+--------+-------+-------+--------------+--------------------+-----------------+----------+--------------------+------------+
|sequence|recType|valCode|registerNumber| rest| errorCode|errorType | errorDescription|isSuccessful|
+--------+-------+-------+--------------+--------------------+-----------------+----------+--------------------+------------+
| 7| 1| 0| XXXX8822|010XXXX8822XBCDEF...|CHAR0009-CHAR0021|ERROR-WARN|Short Failed-Miss...| N|
| 7| 11| 0| XXXX8822|110XXXX8822LLLLLL...|CHAR0009-CHAR0021|ERROR-WARN|Short Failed-Miss...| N|
| 7| 12| 0| XXXX8822|120XXXX8822011GB ...|CHAR0009-CHAR0021|ERROR-WARN|Short Failed-Miss...| N|
| 7| 18| 0| XXXX8822|180XXXX8822 ...|CHAR0009-CHAR0021|ERROR-WARN|Short Failed-Miss...| N|
| 7| 18| 0| XXXX8822|180XXXX88220 ...|CHAR0009-CHAR0021|ERROR-WARN|Short Failed-Miss...| N|
+--------+-------+-------+--------------+--------------------+-----------------+----------+--------------------+------------+
需要做哪些更改来适应新的场景。请帮助。谢谢你。
您需要最小的更改,仅限于您的UDFs
。
假设您有一个简单的python函数,get_type_from_code
能够将带有错误代码的字符串转换为相应的类型(同样适用于描述)。
from pyspark.sql import functions as F, types as T
def get_type_from_code(c: str) -> str:
"""Function to convert error code to error type.
Mind the interface: string in, string out
"""
return {'CHAR0009': 'ERROR', 'CHAR0021': 'WARNING'}.get(c, 'UNKNOWN')
@F.udf(returnType=T.StringType())
def convert_errcodes_to_types(codes: str) -> str:
"""Convert a string of error codes separated by '-' into a string of types concatenated with '-'"""
return '-'.join(
map(get_type_from_code, codes.split('-'))
)
完成了!