我在Spark中有一个ETL作业,该作业也连接到MySQL,以获取一些数据。从历史上看,我一直在做以下操作:
hiveContext.read().jdbc(
dbProperties.getProperty("myDbInfo"),
"(SELECT id, name FROM users) r",
new Properties()).registerTempTable("tmp_users");
Row[] res = hiveContext.sql("SELECT "
+ " u.name, "
+ " SUM(s.revenue) AS revenue "
+ "FROM "
+ " stats s "
+ " INNER JOIN tmp_users u "
+ " ON u.id = s.user_id
+ "GROUP BY "
+ " u.name "
+ "ORDER BY "
+ " revenue DESC
+ "LIMIT 10").collect();
String ids = "";
// now grab me some info for users that are in tmp_user_stats
for (i = 0; i < res.length; i++) {
s += (!s.equals("") ? "," : "") + res[i](0);
}
hiveContext.jdbc(
dbProperties.getProperty("myDbInfo"),
"(SELECT name, surname, home_address FROM users WHERE id IN ("+ids+")) r",
new Properties()).registerTempTable("tmp_users_prises");
但是,当将其缩放到多个工人节点时,每当我使用tmp_users
表时,它都会运行查询,并且每个节点至少执行一次(至少),这将归结为我们的DB管理员,用刀在办公室周围运行。
处理此问题的最佳方法是什么?我可以在3台机器上运行作业,将其限制为3个查询,然后将数据写入Hadoop以供其他节点使用它或什么?
本质上 - 正如评论中所建议的 - 我可以在ETL作业之外运行查询,该查询可以从MySQL侧准备数据并将其导入Hadoop。但是,可能会有随后的查询,它在SPARK和JDBC连接设置的LINE 中提出了更多的解决方案。
我将接受SQOOP解决方案,因为它至少提供了更简化的解决方案,尽管我仍然不确定它是否可以完成工作。如果我找到了一些东西,我将再次编辑问题。
您可以缓存数据:
val initialDF = hiveContext.read().jdbc(
dbProperties.getProperty("myDbInfo"),
"(SELECT id, name FROM users) r",
new Properties())
initialDF.cache();
initialDF.registerTempTable("tmp_users");
第一次读取后,数据将在内存中缓存
替代方案(不伤害dba;))是将sqoop与参数 --num-mappers=3
一起使用,然后将结果文件导入spark