将MySQL表转换为拼花时出现Spark异常



我正在尝试使用spark 1.6.2将MySQL远程表转换为parquet文件。

进程运行10分钟,填满内存,然后以这些消息开始:

WARN NettyRpcEndpointRef: Error sending message [message = Heartbeat(driver,[Lscala.Tuple2;@dac44da,BlockManagerId(driver, localhost, 46158))] in 1 attempts
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10 seconds]. This timeout is controlled by spark.executor.heartbeatInterval

结尾失败,出现以下错误:

ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkDriverActorSystem-scheduler-1] shutting down ActorSystem [sparkDriverActorSystem]
java.lang.OutOfMemoryError: GC overhead limit exceeded

我用这些命令在spark-shell中运行它:

spark-shell --packages mysql:mysql-connector-java:5.1.26 org.slf4j:slf4j-simple:1.7.21 --driver-memory 12G
val dataframe_mysql = sqlContext.read.format("jdbc").option("url", "jdbc:mysql://.../table").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "...").option("user", "...").option("password", "...").load()
dataframe_mysql.saveAsParquetFile("name.parquet")

我将最大执行器内存限制为12G。是否有一种方法来强制写parquet文件在"小"块释放内存?

问题似乎是在使用jdbc连接器读取数据时没有定义分区。

从JDBC读取默认情况下不是分布式的,因此要启用分布式,必须设置手动分区。你需要一个列,它是一个很好的分区键,你必须事先知道分布。

你的数据看起来是这样的:

root 
|-- id: long (nullable = false) 
|-- order_year: string (nullable = false) 
|-- order_number: string (nullable = false) 
|-- row_number: integer (nullable = false) 
|-- product_code: string (nullable = false) 
|-- name: string (nullable = false) 
|-- quantity: integer (nullable = false) 
|-- price: double (nullable = false) 
|-- price_vat: double (nullable = false) 
|-- created_at: timestamp (nullable = true) 
|-- updated_at: timestamp (nullable = true)

order_year对我来说似乎是一个很好的候选人。(根据你的评论,你似乎有20年的时间)

import org.apache.spark.sql.SQLContext
val sqlContext: SQLContext = ???
val driver: String = ???
val connectionUrl: String = ???
val query: String = ???
val userName: String = ???
val password: String = ???
// Manual partitioning
val partitionColumn: String = "order_year"
val options: Map[String, String] = Map("driver" -> driver,
  "url" -> connectionUrl,
  "dbtable" -> query,
  "user" -> userName,
  "password" -> password,
  "partitionColumn" -> partitionColumn,
  "lowerBound" -> "0",
  "upperBound" -> "3000",
  "numPartitions" -> "300"
)
val df = sqlContext.read.format("jdbc").options(options).load()
PS:

partitionColumn, lowerBound, upperBound, numPartitions:如果指定了这些选项中的任何一个,则必须指定所有这些选项。

现在你可以保存你的DataFrame拼花。

相关内容

  • 没有找到相关文章

最新更新