Spark (v 2.3.2) 数据帧正在将 ORC 文件中的所有列作为字符串类型读取. 这是正常行为吗?



我有一堆CSV文件,它们使用ETL工具Informatica以ORC格式加载到HDFS中。加载到 HDFS 后,我想提取 ORC 文件的元数据(列名、数据类型(。

但是当我将 ORC 文件加载到 Spark 数据帧中时,所有列都作为string类型进行评估。

示例数据:

ID|Course|Enrol_Date|Credits
123|Biology|21-03-2012 07:34:56|24
908|Linguistics|05-02-2012 11:02:36|15
564|Computer Science|18-03-2012 09:48:09|30
341|Philosophy|23-01-2012 18:12:44|10
487|Math|10-04-2012 17:00:46|20

我正在使用以下命令来实现这一点:

df = sqlContext.sql("SELECT * FROM orc.`<HDFS_path>`");
df.printSchema()

示例输出:

root
|-- ID: string (nullable = true)
|-- Course: string (nullable = true)
|-- Enrol_Date: string (nullable = true)
|-- Credits: string (nullable = true)

我对Spark和HDFS完全陌生。我试图理解为什么每一列都是string类型的结果。使用 csv 源文件创建 ORC 时,这是正常行为吗(无论我们使用哪种工具执行此操作(?还是我在火花中没有正确做一些导致这种情况的事情?

默认情况下,Spark 将所有字段读取为StringType。您可以尝试以下操作:

为了推断模式,

val data = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("<path>.csv")

用于提供自定义架构

import org.apache.spark.sql.types._
val customSchema = StructType(Array(
StructField("col1", StringType, true),
StructField("col2", IntegerType, true),
StructField("col3", DoubleType, true))
)
val data = spark.read.format("csv").option("header", "true").schema(customSchema).load("<path>.csv")

在通过Informatica导入文件时,您应该cast或使用CSVschema。由于SparkORC格式不会像 SparkCSV格式那样自动infer Schema。ORC 格式按原样从源文件架构中获取架构。

由于您没有在Informatica中使用任何schema,因此它默认写入数据String数据类型,ORC进一步采用该数据类型。

有两种可能的方法可以解决问题:

  1. 在 CSV 文件中使用架构(转换应具有的列String( 中的数据类型Informatica/Spark并加载到ORC中。

  2. 或者使用 Spark 中的StructCasting更改所需列ORC文件的数据类型。

示例演示:

下面是 Spark 如何与Schema一起工作的示例演示。您可以类似于源CSV文件架构的逻辑InformaticaSpark给出的相同,如下所示

情况 1:默认加载 CSV 文件并写入 ORC

scala> val df = spark.read.format("csv").option("header","true").load("/spath/stack2.csv")
//Default schema uses by Spark or Informatica for CSV file
scala> df.printSchema
root
|-- ID: string (nullable = true)
|-- Course: string (nullable = true)
|-- Enrol_Date: string (nullable = true)
|-- Credits: string (nullable = true)
//Have loaded same CSV file into ORC
scala> df.write.format("orc").mode("overwrite").save("/spath/AP_ORC")
scala> val orc = spark.read.format("orc").load("/spath/AP_ORC")
//Schema is same as Source CSV file
scala> orc.printSchema
root
|-- ID: string (nullable = true)
|-- Course: string (nullable = true)
|-- Enrol_Date: string (nullable = true)
|-- Credits: string (nullable = true)

案例 2:转换/推断 CSV 文件的模式数据类型并写入 ORC

//Inferring Schema or Transform/casting of CSV data in Spark or Informatica respectively. 
scala> val df = spark.read.format("csv").option("header","true").option("inferschema", "true").load("/spath/stack2.csv")
//Transformed Schema
scala> df.printSchema
root
|-- ID: integer (nullable = true)
|-- Course: string (nullable = true)
|-- Enrol_Date: string (nullable = true)
|-- Credits: integer (nullable = true)
//Have loaded same CSV file into ORC
scala> df.write.format("orc").mode("overwrite").save("/spath/AP_ORC")
scala> val orc = spark.read.format("orc").load("/spath/AP_ORC")
//Schema is same as Source CSV file
scala> orc.printSchema
root
|-- ID: integer (nullable = true)
|-- Course: string (nullable = true)
|-- Enrol_Date: string (nullable = true)
|-- Credits: integer (nullable = true)

相关内容

最新更新