我最初使用以下架构将 csv 数据转换为 orc 格式。这是每天发生的拉动。
MySchema = StructType([
StructField("RetailUnit", StringType()),
StructField("RetailUnitSysCode", IntegerType())])
大约一个月后,我在使用其中一列时遇到了一些问题,需要将类型更改为 String,如下所示:
MySchema = StructType([
StructField("RetailUnit", StringType()),
StructField("RetailUnitSysCode", StringType())])
现在,如果我阅读整个数据集并尝试显示:
alloc = spark.read.orc(f"tables/orc/alloc/")
alloc.select('RetailUnitSysCode').show()
我得到一个空指针异常:
An error occurred while calling o2302.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 168.0 failed 4 times, most recent failure: Lost task 0.3 in stage 168.0 (TID 29923, ip-172-31-45-122.ec2.internal, executor 565): java.lang.NullPointerException
at org.apache.spark.sql.execution.datasources.orc.OrcColumnVector.getInt(OrcColumnVector.java:132)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
有没有办法使用新架构读取 orc 数据并填充空值,从而使数据集可用?现在,如果我检查数据帧上的数据类型,我('RetailUnitSysCode', 'int')
如果您事先知道架构,为什么不读取数据帧中的 ORC 文件并使用简单的select
函数调用进行转换。
你可以尝试这样的事情:
import org.apache.spark.sql.functions.col
alloc = spark.read.orc(f"tables/orc/alloc/")
myschemaDef = [("RetailUnit", "string"),"RetailUnitSysCode", "integer")]
columnExprs = [col(elem[0]).as(elem[1]) for elem in myschemaDef]
transformed_df = alloc.select(*columnExprs)
这应该会更改数据帧中的数据类型。当您将其写回HDFS,Hive等持久存储时,应保留这些内容。
注意:*columnExprs中的"*">允许我们解压缩列表。 并且是传递多个参数的常见 Python 列表解包功能。