我有一堆CSV文件,它们使用ETL工具Informatica以ORC格式加载到HDFS中。加载到 HDFS 后,我想提取 ORC 文件的元数据(列名、数据类型(。
但是当我将 ORC 文件加载到 Spark 数据帧中时,所有列都作为string
类型进行评估。
示例数据:
ID|Course|Enrol_Date|Credits
123|Biology|21-03-2012 07:34:56|24
908|Linguistics|05-02-2012 11:02:36|15
564|Computer Science|18-03-2012 09:48:09|30
341|Philosophy|23-01-2012 18:12:44|10
487|Math|10-04-2012 17:00:46|20
我正在使用以下命令来实现这一点:
df = sqlContext.sql("SELECT * FROM orc.`<HDFS_path>`");
df.printSchema()
示例输出:
root
|-- ID: string (nullable = true)
|-- Course: string (nullable = true)
|-- Enrol_Date: string (nullable = true)
|-- Credits: string (nullable = true)
我对Spark和HDFS完全陌生。我试图理解为什么每一列都是string
类型的结果。使用 csv 源文件创建 ORC 时,这是正常行为吗(无论我们使用哪种工具执行此操作(?还是我在火花中没有正确做一些导致这种情况的事情?
默认情况下,Spark 将所有字段读取为StringType。您可以尝试以下操作:
为了推断模式,
val data = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("<path>.csv")
用于提供自定义架构
import org.apache.spark.sql.types._
val customSchema = StructType(Array(
StructField("col1", StringType, true),
StructField("col2", IntegerType, true),
StructField("col3", DoubleType, true))
)
val data = spark.read.format("csv").option("header", "true").schema(customSchema).load("<path>.csv")
在通过Informatica导入文件时,您应该cast
或使用CSV
schema
。由于SparkORC
格式不会像 SparkCSV
格式那样自动infer Schema
。ORC 格式按原样从源文件架构中获取架构。
由于您没有在Informatica中使用任何schema
,因此它默认写入数据String
数据类型,ORC
进一步采用该数据类型。
有两种可能的方法可以解决问题:
-
在 CSV 文件中使用架构(转换应具有的列
String
( 中的数据类型Informatica
/Spark
并加载到ORC
中。 -
或者使用 Spark 中的
Struct
或Casting
更改所需列ORC
文件的数据类型。
示例演示:
下面是 Spark 如何与Schema
一起工作的示例演示。您可以类似于源CSV
文件架构的逻辑Informatica
与Spark
给出的相同,如下所示
情况 1:默认加载 CSV 文件并写入 ORC
scala> val df = spark.read.format("csv").option("header","true").load("/spath/stack2.csv")
//Default schema uses by Spark or Informatica for CSV file
scala> df.printSchema
root
|-- ID: string (nullable = true)
|-- Course: string (nullable = true)
|-- Enrol_Date: string (nullable = true)
|-- Credits: string (nullable = true)
//Have loaded same CSV file into ORC
scala> df.write.format("orc").mode("overwrite").save("/spath/AP_ORC")
scala> val orc = spark.read.format("orc").load("/spath/AP_ORC")
//Schema is same as Source CSV file
scala> orc.printSchema
root
|-- ID: string (nullable = true)
|-- Course: string (nullable = true)
|-- Enrol_Date: string (nullable = true)
|-- Credits: string (nullable = true)
案例 2:转换/推断 CSV 文件的模式数据类型并写入 ORC
//Inferring Schema or Transform/casting of CSV data in Spark or Informatica respectively.
scala> val df = spark.read.format("csv").option("header","true").option("inferschema", "true").load("/spath/stack2.csv")
//Transformed Schema
scala> df.printSchema
root
|-- ID: integer (nullable = true)
|-- Course: string (nullable = true)
|-- Enrol_Date: string (nullable = true)
|-- Credits: integer (nullable = true)
//Have loaded same CSV file into ORC
scala> df.write.format("orc").mode("overwrite").save("/spath/AP_ORC")
scala> val orc = spark.read.format("orc").load("/spath/AP_ORC")
//Schema is same as Source CSV file
scala> orc.printSchema
root
|-- ID: integer (nullable = true)
|-- Course: string (nullable = true)
|-- Enrol_Date: string (nullable = true)
|-- Credits: integer (nullable = true)