Spark 中的分区来自 PostgreSQL (JDBC) 的查询



我在集群模式下运行 spark(使用 pyspark(,并通过 JDBC 从 RDBMS 读取数据。 我通过查询(不是直接表格(读取信息

我使用选项进行分区,例如 numPartitions、上限等......

sql = (select ... )

df=spark
.read
.jdbc(url=jdbcUrl, table=sql, 
properties=connectionProperties, column="brand_id", lowerBound=1, 
upperBound=12000,numPartitions=10000 )

不幸的是,Spark 在查询结束时生成了 WHERE 子句上的分区选项,因此 PostGreSQL 在不使用索引的情况下读取了完整的表!

我有一个这样的查询

SELECT "brand_id","brand_name","eq_ref_raw","oe","btimestamp" FROM 
( select  ... ) 
tab WHERE brand_id >= 5 AND brand_id < 6  

在当前的Spark版本中,你尝试做的事情似乎是不可能的。执行的查询构造如下:

val sqlText = s"SELECT $columnList FROM ${options.table} $myWhereClause"
stmt = conn.prepareStatement(sqlText,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)

(参见org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD#compute(

options.table对应于属性table中的 (SELECT ...( 语句。

你能解释一下为什么需要从子查询加载列吗?如果在此子查询中进行联接或其他 SQL 操作,则始终可以"解决方法"并使用 Spark SQL 执行此操作(联接、SQL 操作等(。


编辑:

如您所解释的,使用子查询的原因是 JSONB 提取。显然,它作为SQL本机操作会表现得更好,但是如果你想使用Spark来并行处理,IMO你需要在Spark级别声明你的JSON处理,如下所示:

CREATE TABLE jsonb_test (
content jsonb
);
INSERT INTO jsonb_test (content) VALUES 
('{"first_name": "X", "last_name": "Y"}');

代码包括:

val opts = Map("url" -> "jdbc:postgresql://127.0.0.1:5432/spark_jsonb_test",
"dbtable" -> "jsonb_test", "user" -> "root", "password" -> "root",
"driver" -> "org.postgresql.Driver")
val schema = StructType(Seq(
StructField("first_name", StringType, true), StructField("last_name", StringType, true)
))
import sparkSession.implicits._
val personDataFrame = sparkSession.read
.format("jdbc")
.options(opts)
.load()
.withColumn("person", functions.from_json($"content", schema))
val extractedJsonNames = personDataFrame.collect.map(row => row.get(1).toString)
extractedJsonNames should have size 1
extractedJsonNames(0) shouldEqual "[X,Y]"

Spark 在整个PostgresDialect中支持 JSONB 字段,在其将数据库类型转换为 Catalyst 类型的方法中,将 JSONB 视为StringType

private def toCatalystType(
typeName: String,
precision: Int,
scale: Int): Option[DataType] = typeName match {
case "bool" => Some(BooleanType)
case "bit" => Some(BinaryType)
case "int2" => Some(ShortType)
case "int4" => Some(IntegerType)
case "int8" | "oid" => Some(LongType)
case "float4" => Some(FloatType)
case "money" | "float8" => Some(DoubleType)
case "text" | "varchar" | "char" | "cidr" | "inet" | "json" | "jsonb" | "uuid" =>
Some(StringType)

最新更新