数据帧强制转换未引发溢出异常并产生null


from pyspark.sql.functions import *
from pyspark.sql.types import *

我正试图将数据帧转换为df.column.cast(ShortType()),但当我尝试插入数据99999时,它正在转换为null而没有抛出任何错误,所以你能建议在转换时抛出错误的任何方法吗。

如果强制转换出错,Spark不会抛出。

作为一种捕获这些错误的自定义方法,您可以编写一个UDF,如果您强制转换为null,它就会抛出。不过,这会使脚本的性能恶化,因为Spark无法优化UDF的执行。

如果pyspark.sql.Column.cast失败,Spark将静默失败,即整个列将变为NULL。你有几个选择来解决这个问题:

  1. 如果要在读取文件时检测类型,可以使用预定义(预期(的模式和mode=failfast集进行读取,例如:
df = spark.createDataFrame([(1,0,0,2),(1,1,1,1)],['c1','c2','c3','c4'])
df.toPandas().to_csv("./test.csv")
spark.read.csv("./test.csv").show()
+----+---+---+---+---+
| _c0|_c1|_c2|_c3|_c4|
+----+---+---+---+---+
|null| c1| c2| c3| c4|
|   0|  1|  0|  0|  2|
|   1|  1|  1|  1|  1|
+----+---+---+---+---+

运行spark.read.schema("_c0 INT, _c1 INT, _c2 INT, _c3 INT, _c4 INT").option("mode", "failfast").csv("./test.csv").show()引发:org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST.,因为_c1_c4中的记录是第一行中的字符串(因为默认情况下header=False,所以标头被视为行(。这可以在堆栈中找到:Caused by: java.lang.NumberFormatException: For input string: "_c1"。要进行比较,请运行

spark.read.schema("_c0 INT, c1 INT, c2 INT, c3 INT, c4 INT").option("mode", "ignore").csv("./test.csv").show()
+----+----+----+----+----+
| _c0|  c1|  c2|  c3|  c4|
+----+----+----+----+----+
|null|null|null|null|null|
|   0|   1|   0|   0|   2|
|   1|   1|   1|   1|   1|
+----+----+----+----+----+

但会抛出以下警告WARN ParseMode: ignore is not a valid parse mode. Using PERMISSIVE.

  1. 您的第二个选项是使用UDF(或者更好的是,pandas_udf,因为它是矢量化的(。在这里,当您试图将Python/Pandas使用的类型与PySpark使用的JVM类型进行匹配时,您面临着遇到难以调试的类型匹配错误的风险。例如:
import pyspark.sql.functions as f
df2 = spark.createDataFrame([("a",0,0,2),("b",1,1,1)],['c1','c2','c3','c4'])
df2.show()
+---+---+---+---+
| c1| c2| c3| c4|
+---+---+---+---+
|  a|  0|  0|  2|
|  b|  1|  1|  1|
+---+---+---+---+

@f.pandas_udf("long")
def my_cast(column):
return column.astype("int64")
df2.select(my_cast(f.col("c1"))).show()

此操作将抛出:ValueError: invalid literal for int() with base 10: 'b'

最新更新