.csv不是选择配置单元查询上的SequenceFile错误



我是Spark和Scala的新手;(

代码摘要:

正在从CSV文件读取数据-->在2个文件上创建简单的内部联接-->将数据写入配置单元表-->提交集群上的作业

你能帮忙确定出了什么问题吗。代码其实并不复杂。该作业在集群上执行良好。所以,当我试图可视化写在配置单元表上的数据时,我面临着一个问题。

hive>从客户限制10中选择*;

失败,出现异常java.io。IOException:java.io.IOException:hdfs://m01.itversity.com:9000/user/itv000666/warehouse/updatedcustomers.db/customers/part-00000-348a54cf-aa0c-45b4-ac49-3a881ae39702_00000.c000.csv不是序列文件

object LapeyreSparkDemo extends App {

//Getting spark ready
val sparkConf = new SparkConf()
sparkConf.set("spark.app.name","Spark for Lapeyre")

//Creating Spark Session
val spark = SparkSession.builder()
.config(sparkConf)
.enableHiveSupport()
.config("spark.sql.warehouse.dir","/user/itv000666/warehouse")
.getOrCreate()                       
Logger.getLogger(getClass.getName).info("Spark Session Created Successfully")

//Reading
Logger.getLogger(getClass.getName).info("Data loading in DF started")
val ordersSchema = "orderid Int, customerName String, orderDate String, custId Int, orderStatus 
String, age String, amount Int" 
val orders2019Df = spark.read
.format("csv")
.option("header",true)
.schema(ordersSchema)
.option("path","/user/itv0006666/lapeyrePoc/orders2019.csv")
.load
val newOrder = orders2019Df.withColumnRenamed("custId", "oldCustId")
.withColumnRenamed("customername","oldCustomerName")

val orders2020Df = spark.read
.format("csv")
.option("header",true)
.schema(ordersSchema)
.option("path","/user/itv000666/lapeyrePoc/orders2020.csv")
.load

Logger.getLogger(getClass.getName).info("Data loading in DF complete")

//processing
Logger.getLogger(getClass.getName).info("Processing Started")
val joinCondition = newOrder.col("oldCustId") === orders2020Df.col("custId")
val joinType = "inner"
val joinData = newOrder.join(orders2020Df, joinCondition, joinType)
.select("custId","customername")

//Writing

spark.sql("create database if not exists updatedCustomers")

joinData.write
.format("csv")
.mode(SaveMode.Overwrite)
.bucketBy(4, "custId")
.sortBy("custId")
.saveAsTable("updatedCustomers.Customers")

//Stopping Spark Session
spark.stop()
}

如果需要更多信息,请告诉我。提前谢谢。

这是的罪魁祸首

joinData.write
.format("csv")

相反,使用了这个,它起了作用。

joinData.write
.format("Hive")

由于我正在将数据写入配置单元表(orc格式(,因此格式应该是">蜂巢";而不是csv

此外,在创建火花会话时,不要忘记启用配置单元支持。此外,在火花2中,bucketby&不支持sortby。也许在Spark 3中是这样。

最新更新