from pyspark.sql.functions import *
from pyspark.sql.types import *
我正试图将数据帧转换为df.column.cast(ShortType())
,但当我尝试插入数据99999时,它正在转换为null而没有抛出任何错误,所以你能建议在转换时抛出错误的任何方法吗。
如果强制转换出错,Spark不会抛出。
作为一种捕获这些错误的自定义方法,您可以编写一个UDF,如果您强制转换为null,它就会抛出。不过,这会使脚本的性能恶化,因为Spark无法优化UDF的执行。
如果pyspark.sql.Column.cast
失败,Spark将静默失败,即整个列将变为NULL
。你有几个选择来解决这个问题:
- 如果要在读取文件时检测类型,可以使用预定义(预期(的模式和
mode=failfast
集进行读取,例如:
df = spark.createDataFrame([(1,0,0,2),(1,1,1,1)],['c1','c2','c3','c4'])
df.toPandas().to_csv("./test.csv")
spark.read.csv("./test.csv").show()
+----+---+---+---+---+
| _c0|_c1|_c2|_c3|_c4|
+----+---+---+---+---+
|null| c1| c2| c3| c4|
| 0| 1| 0| 0| 2|
| 1| 1| 1| 1| 1|
+----+---+---+---+---+
运行spark.read.schema("_c0 INT, _c1 INT, _c2 INT, _c3 INT, _c4 INT").option("mode", "failfast").csv("./test.csv").show()
引发:org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST.
,因为_c1
到_c4
中的记录是第一行中的字符串(因为默认情况下header=False
,所以标头被视为行(。这可以在堆栈中找到:Caused by: java.lang.NumberFormatException: For input string: "_c1"
。要进行比较,请运行
spark.read.schema("_c0 INT, c1 INT, c2 INT, c3 INT, c4 INT").option("mode", "ignore").csv("./test.csv").show()
+----+----+----+----+----+
| _c0| c1| c2| c3| c4|
+----+----+----+----+----+
|null|null|null|null|null|
| 0| 1| 0| 0| 2|
| 1| 1| 1| 1| 1|
+----+----+----+----+----+
但会抛出以下警告WARN ParseMode: ignore is not a valid parse mode. Using PERMISSIVE.
- 您的第二个选项是使用UDF(或者更好的是,
pandas_udf
,因为它是矢量化的(。在这里,当您试图将Python/Pandas使用的类型与PySpark使用的JVM类型进行匹配时,您面临着遇到难以调试的类型匹配错误的风险。例如:
import pyspark.sql.functions as f
df2 = spark.createDataFrame([("a",0,0,2),("b",1,1,1)],['c1','c2','c3','c4'])
df2.show()
+---+---+---+---+
| c1| c2| c3| c4|
+---+---+---+---+
| a| 0| 0| 2|
| b| 1| 1| 1|
+---+---+---+---+
@f.pandas_udf("long")
def my_cast(column):
return column.astype("int64")
df2.select(my_cast(f.col("c1"))).show()
此操作将抛出:ValueError: invalid literal for int() with base 10: 'b'