我在集群模式下运行 spark(使用 pyspark(,并通过 JDBC 从 RDBMS 读取数据。 我通过查询(不是直接表格(读取信息
我使用选项进行分区,例如 numPartitions、上限等......
sql = (select ... )
和
df=spark
.read
.jdbc(url=jdbcUrl, table=sql,
properties=connectionProperties, column="brand_id", lowerBound=1,
upperBound=12000,numPartitions=10000 )
不幸的是,Spark 在查询结束时生成了 WHERE 子句上的分区选项,因此 PostGreSQL 在不使用索引的情况下读取了完整的表!
我有一个这样的查询
SELECT "brand_id","brand_name","eq_ref_raw","oe","btimestamp" FROM
( select ... )
tab WHERE brand_id >= 5 AND brand_id < 6
在当前的Spark版本中,你尝试做的事情似乎是不可能的。执行的查询构造如下:
val sqlText = s"SELECT $columnList FROM ${options.table} $myWhereClause"
stmt = conn.prepareStatement(sqlText,
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
(参见org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD#compute(
options.table
对应于属性table
中的 (SELECT ...( 语句。
你能解释一下为什么需要从子查询加载列吗?如果在此子查询中进行联接或其他 SQL 操作,则始终可以"解决方法"并使用 Spark SQL 执行此操作(联接、SQL 操作等(。
编辑:
如您所解释的,使用子查询的原因是 JSONB 提取。显然,它作为SQL本机操作会表现得更好,但是如果你想使用Spark来并行处理,IMO你需要在Spark级别声明你的JSON处理,如下所示:
CREATE TABLE jsonb_test (
content jsonb
);
INSERT INTO jsonb_test (content) VALUES
('{"first_name": "X", "last_name": "Y"}');
代码包括:
val opts = Map("url" -> "jdbc:postgresql://127.0.0.1:5432/spark_jsonb_test",
"dbtable" -> "jsonb_test", "user" -> "root", "password" -> "root",
"driver" -> "org.postgresql.Driver")
val schema = StructType(Seq(
StructField("first_name", StringType, true), StructField("last_name", StringType, true)
))
import sparkSession.implicits._
val personDataFrame = sparkSession.read
.format("jdbc")
.options(opts)
.load()
.withColumn("person", functions.from_json($"content", schema))
val extractedJsonNames = personDataFrame.collect.map(row => row.get(1).toString)
extractedJsonNames should have size 1
extractedJsonNames(0) shouldEqual "[X,Y]"
Spark 在整个PostgresDialect
中支持 JSONB 字段,在其将数据库类型转换为 Catalyst 类型的方法中,将 JSONB 视为StringType
:
private def toCatalystType(
typeName: String,
precision: Int,
scale: Int): Option[DataType] = typeName match {
case "bool" => Some(BooleanType)
case "bit" => Some(BinaryType)
case "int2" => Some(ShortType)
case "int4" => Some(IntegerType)
case "int8" | "oid" => Some(LongType)
case "float4" => Some(FloatType)
case "money" | "float8" => Some(DoubleType)
case "text" | "varchar" | "char" | "cidr" | "inet" | "json" | "jsonb" | "uuid" =>
Some(StringType)