使用 Spark 中的 Pushdown 查询,如何在 spark-HBASE(BIGSQL 作为 SQL 引擎)中获得



在Spark中,PushdownQuery由DB的SQL引擎处理,并根据它的结果构造数据帧。 因此,Spark 查询该查询的结果。

val pushdownQuery = """(SELECT DISTINCT(FLIGHT_NUMBER) blah blah blah )"""
val dbDataFrame = spark.read.format("jdbc")
.option("url", url).option("dbtable", pushdownQuery).option("driver", driver)
.option("numPartitions", 4)
.option("partitionColumn", "COUNTRY_CODE")
.load()

我可以从 spark - mysql 中的另一个引用(https://dzone.com/articles/how-apache-spark-makes-your-slow-mysql-query-10x(中看到,下推查询中的并行性是通过基于参数 numPartitions 和 partitionColumn 触发多个查询来实现的。 这与 sqoop 如何分布的方法非常相似。 比如上面给出的参数的 numPartitions = 4 的例子;partitionColumn = COUNTRY_CODE 在我们的表中COUNTRY_CODE值范围落在 (000,999( 上。

构建 4 个查询;触发到 DB,数据帧从这些查询的结果构造(在本例中并行度为 4(。

Q1 : SELECT DISTINCT(FLIGHT_NUMBER) blah blah blah WHERE COUNTRY_CODE >= 000 AND COUNTRY_CODE <= 250
Q2 : SELECT DISTINCT(FLIGHT_NUMBER) blah blah blah WHERE COUNTRY_CODE > 250 AND COUNTRY_CODE  <= 500
Q3 : SELECT DISTINCT(FLIGHT_NUMBER) blah blah blah WHERE COUNTRY_CODE > 500 AND COUNTRY_CODE  <= 750
Q4 : SELECT DISTINCT(FLIGHT_NUMBER) blah blah blah WHERE COUNTRY_CODE > 750 AND COUNTRY_CODE  <= 999

我现在的问题是,如何在 Spark(版本 2.1(+ hbase(查询引擎 - BIGSQL(中使用此方法实现并行性? 它现在没有给我并行性。 桥接火花的驱动程序需要更新吗?还是火花需要这样做?或者什么样的变化有助于它做到这一点? 一些方向对我有帮助。谢谢!

为了获得最佳性能,我建议使用 --num-executors 4 和 --executor-cores 1 开始您的 Spark 作业,因为 jdbc 连接是单线程的,每个查询在一个内核上运行一个任务。通过进行此配置更改,当作业运行时,可以观察到并行运行的任务(每个执行器中的核心(正在使用中。

请改用以下函数:

val connectionProperties: Properties = new Properties
connectionProperties.put("user", "xxxx")
connectionProperties.put("password", "xxxx")
connectionProperties.put("fetchsize", "10000") //fetches 10000 records at once per task
connectionProperties.put("driver", "com.mysql.jdbc.Driver")
connectionProperties
val pushdownQuery = """(SELECT DISTINCT(FLIGHT_NUMBER) blah blah blah ) tbl_alias"""
val dbDataFrame = spark.read.jdbc(url, pushdownQuery, "COUNTRY_CODE", 0L, 4L, 4, connectionProperties)

请参阅 https://spark.apache.org/docs/2.3.0/api/scala/index.html#org.apache.spark.sql.DataFrameReader@jdbc(url:String,table:String,columnName:String,下限:Long,upperBound:Long,numPartitions:Int,connectionProperties:java.util.Properties(:org.apache.spark.sql.DataFrame

最新更新