我有一个csv文件,并希望将其加载到硬盘上的parquet文件中,然后使用spark-sql CLI对其运行SQL查询。是否有一个或两个spark-sql命令可以做到这一点?
如果spark-sql不是正确的方法,那么你会建议什么是最简单的方法来加载csv到parquet?在这一步之后,我的下一个任务是对数据
运行sql查询。package spark
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{col, trim}
object csv2parquet extends App {
val spark = SparkSession.builder()
.master("local")
.appName("CSV-Parquet")
.getOrCreate()
import spark.implicits._
val sourceFile = "/<path file>/test.csv" // bad data in file
val targetFile = "/<path file>/testResult.parquet"
// read csv file
val df1 = spark.read.option("header", false).csv(sourceFile)
df1.show(false)
// +-------+-------+----------+-----------+
// |_c0 |_c1 |_c2 |_c3 |
// +-------+-------+----------+-----------+
// |Header |TestApp|2020-01-01|null |
// |name | dept | age | batchDate |
// |john | dept1 | 33 | 2020-01-01|
// |john | dept1 | 33 | 2020-01-01|
// |john | dept1 | 33 | 2020-01-01|
// |john | dept1 | 33 | 2020-01-01|
// |Trailer|count |4 |null |
// +-------+-------+----------+-----------+
// write data to parquet.
df1.write.mode("append").parquet(targetFile)
val resDF = spark.read.parquet(targetFile)
resDF.show(false)
// +-------+-------+----------+-----------+
// |_c0 |_c1 |_c2 |_c3 |
// +-------+-------+----------+-----------+
// |Header |TestApp|2020-01-01|null |
// |name | dept | age | batchDate |
// |john | dept1 | 33 | 2020-01-01|
// |john | dept1 | 33 | 2020-01-01|
// |john | dept1 | 33 | 2020-01-01|
// |john | dept1 | 33 | 2020-01-01|
// |Trailer|count |4 |null |
// +-------+-------+----------+-----------+
// try sql
resDF
.filter(trim(col("_c2")).equalTo(33))
.select(col("_c2"))
.show(false)
// +---+
// |_c2|
// +---+
// | 33|
// | 33|
// | 33|
// | 33|
// +---+
}