我正在写数据(大约为1。从一个数据帧到postgresql,它有点慢。写入db需要2.7小时
查看执行器,只有一个活动任务在一个执行器上运行。是否有任何方法我可以并行写入数据库使用Spark中的所有执行器?
...
val prop = new Properties()
prop.setProperty("user", DB_USER)
prop.setProperty("password", DB_PASSWORD)
prop.setProperty("driver", "org.postgresql.Driver")
salesReportsDf.write
.mode(SaveMode.Append)
.jdbc(s"jdbc:postgresql://$DB_HOST:$DB_PORT/$DATABASE", REPORTS_TABLE, prop)
谢谢
所以我找到了问题所在。基本上,重新分区我的数据框架可以使数据库写吞吐量提高100%
def srcTable(config: Config): Map[String, String] = {
val SERVER = config.getString("db_host")
val PORT = config.getInt("db_port")
val DATABASE = config.getString("database")
val USER = config.getString("db_user")
val PASSWORD = config.getString("db_password")
val TABLE = config.getString("table")
val PARTITION_COL = config.getString("partition_column")
val LOWER_BOUND = config.getString("lowerBound")
val UPPER_BOUND = config.getString("upperBound")
val NUM_PARTITION = config.getString("numPartitions")
Map(
"url" -> s"jdbc:postgresql://$SERVER:$PORT/$DATABASE",
"driver" -> "org.postgresql.Driver",
"dbtable" -> TABLE,
"user" -> USER,
"password"-> PASSWORD,
"partitionColumn" -> PARTITION_COL,
"lowerBound" -> LOWER_BOUND,
"upperBound" -> UPPER_BOUND,
"numPartitions" -> NUM_PARTITION
)
}
Spark在使用jdbc编写时也有一个名为"batchsize"的选项。(1000)
connectionProperties.put("batchsize", "100000")
设置更高的值可以加快写入外部数据库的速度。