Spark数据帧通过执行改变



我对Spark相当陌生,所以很可能我在理解上有很大的差距。如果你在这里看到的是非常愚蠢的,提前道歉。所以,我想要实现的是:

  • 从Hive中的表中获取一组行(让我们称之为T_A))并将它们保存在一个DataFrame中(我们称之为DF_A)。
  • 从另一个Hive表(T_B)获取额外信息),并将其与DF_A连接以获得新的数据框(DF_B)。然后缓存它。
val DF_A = sparkSession.sql("select * from T_A where whatever=something").toDF()
val extraData = sparkSession.sql("select * from T_B where whatever=something").toDF()
val DF_B = DF_A.join(extraData,
col(something_else=other_thing), "left"
).toDF().cache()

现在这是我假设Spark + Hive的工作原理类似于普通的java应用程序+ SQL,这是我可能需要一个艰难的过程修正。

  • 在这里,我尝试存储在我之前使用的Hive表之一(T_B),由列X分区,无论我从DF_B转换(Tr1(DF_B)) N行。我使用:
val DF_C = DF_B.map(row => {
Tr1(row)
}).toDF()
DF_C.write.mode(SaveMode.Overwrite).insertInto("T_B")
  • 保存到该表后,我想重用DF_B中的信息(不是在T_B中重新插入的转换后的数据),而是DF_B中基于T_B先前状态的联接数据)对其进行第二次变换(Tr2(DF_B))。
  • 我想用上一步转换的数据更新T_B中相同的N行,使用"INSERT OVERWRITE">
val DF_D = DF_B.map(row => {
Tr2(row)
}).toDF()
DF_D.write.mode(SaveMode.Overwrite).insertInto("T_B")

我的期望:

  • T_B有N行。
  • DF_B不变,N行

发生了什么:

  • DF_B有3*N行
  • T_C有3*N行

现在,经过一些调试,我发现DF_B在DF_C写完成后有3N行。所以DF_B也将有3N行,这将导致T_B也有3*N行。

所以,我的问题是……是否有一种方法可以保留原始DF_B数据并将其用于第二次映射转换,因为它依赖于转换过程的原始DF_B状态?有什么参考资料可以让我知道为什么会这样吗?编辑:我不知道这是否是有用的信息,但我记录记录计数之前和之后做第一次写。得到如下
val DF_C = DF_B.map(row => {
Tr1(row)
}).toDF()
Logger.info("DF_C.count {} - DF_B.count {}"...
DF_C.write.mode(SaveMode.Overwrite).insertInto("T_B")
Logger.info("DF_C.count {} - DF_B.count {}"...

使用persist(MEMORY_AND_DISK)或根本不使用持久化,而不是缓存和3个测试行。我:

DF_C。计数3 - DF_B。数3

DF_C。计数3 - DF_B。数9

使用缓存,我得到:

DF_C。计数3 - DF_B。数3

DF_C。计数9 - DF_B。数9

任何想法?

谢谢你。

在Spark中,执行以惰性方式发生,只有在调用操作时才会执行。因此,当您在同一数据框(在您的情况下是DF_B)上调用某些操作两次时,该数据框(DB_B)将从执行时开始创建和转换两次。

所以尝试在调用第一个动作之前持久化你的数据框架DF_B,然后你可以为Tr1和Tr2使用相同的DF。

持久化后的数据帧将存储在内存/磁盘中,可以多次重用。

您可以在这里了解更多关于持久性的信息