我正在尝试更新一个大型mysql表(有近100万CCD_行(每行的几个字段。该表没有任何主键(或具有类似UUID的字符串主键(。我没有足够的执行器内存来一次性读取和保存整个数据。有人能告诉我处理这些表格的选择吗。
以下是模式
CREATE TABLE Persons ( Personid varchar(255) NOT NULL, LastName varchar(255) NOT NULL, FirstName varchar(255) DEFAULT NULL, Email varchar(255) DEFAULT NULL, Age int(11) DEFAULT NULL) ) ENGINE=InnoDB DEFAULT CHARSET=latin1;
Spark代码类似
SparkSession spark = SparkSession.builder().master("spark://localhost:7077").appName("KMASK").getOrCreate();
DataFrame rawDataFrame = spark.read().format("jdbc").load();
rawDataFrame.createOrReplaceTempView("data");
//encrypt is UDF
String sql = "select Personid, LastName, FirstName, encrypt(Email), Age from data";
Dataset newData = spark.sql(sql);
newData.write().mode(SaveMode.Overwrite).format("jdbc").options(options).save();
该表有大约200万条CCD_记录,数据大小大约为6GB
。我的执行器内存只有2 gb
。我可以使用Spark-jdbc处理这个表吗。
理想情况下,您可以更改spark jdbcfetchsize
选项,以减少/增加每次提取和处理的记录数量。
对数据进行分区也有助于减少混洗和额外的开销。因为Age
是一个数字字段。您还可以处理由"年龄"确定的分区中的数据。首先确定最小和最大年龄,然后使用Spark JDBC选项。
值得注意的是:
partitionColumn
:Age
lowerBound
:您确定的最小年龄500
0:您确定的最大年龄numPartitions
:确实取决于核心和工作节点的数量,但这里有更多提示和链接
您也可以使用自定义查询来仅选择和更新可以使用query
选项保存在内存中的少数记录。注:。使用query
选项时,不应使用dbtable
选项。