我是火花新手。我们有从mysql读取数据的遗留java spark代码。但是,它在"母版"加载数据。节点,然后根据划分的组向工作人员进行广播。代码如下所示:
Map<Integer, ObjectModel> allGroupInputModels = loadAllDataByGroups();
Broadcast<Map<Integer, ObjectModel>> partialObjectModel = sc.broadcast(allGroupInputObjectModels);
eventDF = sparkHelper.getEventPresenterDataFrame(groupIds, minDate, maxDate);
eventProductDF = sparkHelper.getEventProductDataFrame(groupIds, minDate, maxDate);
JavaPairRDD<Integer, List<Event>> eventsPairRDD = sparkHelper.getCombinedEventRDD(eventDF, eventProductDF).repartition(numPartition);
Map<Integer, ObjectModel> allGroupInputModels = loadAllDataByGroups();
Broadcast<Map<Integer, ObjectModel>> partialModel = sc.broadcast(allGroupInputObjectModels);
eventDF = sparkHelper.getEventPresenterDataFrame(groupIds, minDate, maxDate);
eventProductDF = sparkHelper.getEventProductDataFrame(groupIds, minDate, maxDate);
JavaPairRDD<Integer, List<Event>> eventsPairRDD = sparkHelper.getCombinedEventRDD(eventDF, eventProductDF).repartition(numPartition);
JavaRDD<ResultObject> resultJavaRDD = eventsPairRDD.map(r -> {
Integer groupId = r._1;
System.out.println("Processing Group: " + groupId);
List<Event> groupEvents = r._2;
Map<Integer, ObjectModel> allGroupModel = partialModel.getValue();
ObjectModel groupModel = allGroupModel.get(groupId);
groupModel.setEvents(groupEvents);
// process to get the results using the groupModel
.....
return result;
});
请注意,我们在map函数之外加载了所有组的数据,我认为这意味着所有的数据加载都是在主节点完成的,并广播到工作节点进行计算。我理解对了吗?如果是,那么我担心数据会太大,所以主内存不够用。有什么方法可以将这个数据加载步骤移动到工作节点吗?请建议。谢谢。
我认为没有太多东西可以从代码中抢救出来。传统的JDBC摄取看起来像这样:
SparkSession spark = SparkSession.builder()
.appName(
"MySQL to Dataframe using a JDBC Connection")
.master("local")
.getOrCreate();
// Using properties
Properties props = new Properties();
props.put("user", "root");
props.put("password", "Spark<3Java");
props.put("useSSL", "false");
Dataset<Row> df = spark.read().jdbc(
"jdbc:mysql://localhost:3306/sakila?serverTimezone=EST",
"actor", props);
df = df.orderBy(df.col("last_name"));
// Displays the dataframe and some of its metadata
df.show(5);
df.printSchema();
System.out.println("The dataframe contains " + df
.count() + " record(s).");
一旦您在数据框(Dataset<Row>
)中摄取了数据,您就可以执行所有操作了。司机在这里什么也没做。不需要广播。
与其关联的pox.xml
包含:
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<scala.version>2.12</scala.version>
<spark.version>3.0.0</spark.version>
<mysql.version>8.0.16</mysql.version>
...
</properties>
<dependencies>
<!-- Spark -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
...