我正在使用以下代码读取表:
my_data = sqlContext.read.parquet('hdfs://my_hdfs_path/my_db.db/my_table')
我如何调用Sparksql,因此可以返回类似:
'select col_A, col_B from my_table'
从parquet文件创建数据框后,您必须将其注册为温度表才能在其上运行sql queries
。
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.parquet("src/main/resources/peopleTwo.parquet")
df.printSchema
// after registering as a table you will be able to run sql queries
df.registerTempTable("people")
sqlContext.sql("select * from people").collect.foreach(println)
带有平原SQL
json,orc,parquet和csv文件可以查询,而无需在spark dataframe上创建表。
//This Spark 2.x code you can do the same on sqlContext as well
val spark: SparkSession = SparkSession.builder.master("set_the_master").getOrCreate
spark.sql("select col_A, col_B from parquet.`hdfs://my_hdfs_path/my_db.db/my_table`")
.show()
假设您的hdfs中的parquet文件 ventas4 :
hdfs://localhost:9000/sistestion/sql/ventas4
在这种情况下,这些步骤为:
-
充电SQL上下文:
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
-
读取镶木quet文件:
val ventas=sqlContext.read.parquet("hdfs://localhost:9000/sistgestion/sql/ventas4")
-
注册一个时间表:
ventas.registerTempTable("ventas")
-
执行查询(在此行中,您可以使用tojson通过JSON格式,也可以使用Collect()):
sqlContext.sql("select * from ventas").toJSON.foreach(println(_)) sqlContext.sql("select * from ventas").collect().foreach(println(_))
在Intellij中使用以下代码:
def groupPlaylistIds(): Unit ={
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession.builder.appName("FollowCount")
.master("local[*]")
.getOrCreate()
val sc = spark.sqlContext
val d = sc.read.format("parquet").load("/Users/CCC/Downloads/pq/file1.parquet")
d.printSchema()
val d1 = d.select("col1").filter(x => x!='-')
val d2 = d1.filter(col("col1").startsWith("searchcriteria"));
d2.groupBy("col1").count().sort(col("count").desc).show(100, false)
}