我试图从 mysql 读取数据并将其写回 s3 中的镶木地板文件,具有特定分区,如下所示:
df=sqlContext.read.format('jdbc')
.options(driver='com.mysql.jdbc.Driver',url="""jdbc:mysql://<host>:3306/<>db?user=<usr>&password=<pass>""",
dbtable='tbl',
numPartitions=4 )
.load()
df2=df.withColumn('updated_date',to_date(df.updated_at))
df2.write.parquet(path='s3n://parquet_location',mode='append',partitionBy=['updated_date'])
我的问题是它只打开一个到 mysql 的连接(而不是 4 个),并且在从 mysql 获取所有数据之前它不会写入 parquert,因为我在 mysql 中的表很大(100M 行),进程在 OutOfMemory 上失败。
有没有办法配置 Spark 以打开与 mysql 的多个连接并将部分数据写入 parquet?
您应该设置以下属性:
partitionColumn,
lowerBound,
upperBound,
numPartitions
正如这里记录的那样:http://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases
对于 Spark>= 2.0我创建了一个包含以下方法的类:
...
private val dbUrl =
s"""jdbc:mysql://${host}:${port}/${db_name}
|?zeroDateTimeBehavior=convertToNull
|&read_buffer_size=100M""".stripMargin.replace("n", "")
def run(sqlQuery: String): DataFrame = {
println(sqlQuery)
Datapipeline.spark.read
.format("jdbc")
.option("driver", "com.mysql.jdbc.Driver")
.option("url", dbUrl)
.option("user", user)
.option("password", pass)
.option("dbtable", s"($sqlQuery) as tmp")
.load()
}
...
def getBounds(table: String, whereClause: String, partitionColumn: String): Array[Int] = {
val sql = s"select min($partitionColumn) as min, max($partitionColumn) as max from $table${
if (whereClause.length > 0) s" where $whereClause"
}"
val df = run(sql).collect()(0)
Array(df.get(0).asInstanceOf[Int], df.get(1).asInstanceOf[Int])
}
def getTableFields(table: String): String = {
val sql =
s"""
|SELECT *
|FROM information_schema.COLUMNS
|WHERE table_name LIKE '$table'
| AND TABLE_SCHEMA LIKE '${db_name}'
|ORDER BY ORDINAL_POSITION
""".stripMargin
run(sql).collect().map(r => r.getAs[String]("COLUMN_NAME")).mkString(", ")
}
/**
* Returns DataFrame partitioned by <partritionColumn> to number of partitions provided in
* <numPartitions> for a <table> with WHERE clause
* @param table - a table name
* @param whereClause - WHERE clause without "WHERE" key word
* @param partitionColumn - column name used for partitioning, should be numeric
* @param numPartitions - number of partitions
* @return - a DataFrame
*/
def run(table: String, whereClause: String, partitionColumn: String, numPartitions: Int): DataFrame = {
val bounds = getBounds(table, whereClause, partitionColumn)
val fields = getTableFields(table)
val dfs: Array[DataFrame] = new Array[DataFrame](numPartitions)
val lowerBound = bounds(0)
val partitionRange: Int = ((bounds(1) - bounds(0)) / numPartitions)
for (i <- 0 to numPartitions - 2) {
dfs(i) = run(
s"""select $fields from $table
| where $partitionColumn >= ${lowerBound + (partitionRange * i)} and $partitionColumn < ${lowerBound + (partitionRange * (i + 1))}${
if (whereClause.length > 0)
s" and $whereClause"
}
""".stripMargin.replace("n", ""))
}
dfs(numPartitions - 1) = run(s"select $fields from $table where $partitionColumn >= ${lowerBound + (partitionRange * (numPartitions - 1))}${
if (whereClause.length > 0)
s" and $whereClause"
}".replace("n", ""))
dfs.reduceLeft((res, df) => res.union(df))
}
最后run
方法将创建许多必要的分区。调用操作方法时,Spark 将创建与为 run
方法返回的数据帧定义的分区数一样多的并行任务。
享受。