spark.read或spark.sql是惰性转换吗



在Spark中,如果源数据在两次操作调用之间发生了变化,为什么我仍然得到以前的o/p,而不是最近的o/p。通过DAG,一旦调用操作,所有操作都将被执行,包括读取操作。不是吗?

例如。df = spark.sql("select * from dummy.table1")#从有两条记录的火花表读取数据帧。

df.count()#计数为2条记录

现在,在重新运行command1的情况下,将调用插入表和操作中的记录。

df.count()#仍有2条记录。

我原以为Spark会再次执行读取操作,并将总共3条记录提取到数据帧中。

我的理解哪里错了?

为了对比您的断言,下面的内容确实有所不同——使用Databricks Notebook(单元格(。您指示的插入操作未知。

但是以下使用基于parquet或csv的Spark(因此不是Hive表(,确实会随着组成表的文件的更改而导致结果的差异。不过,对于DAG重新计算,使用相同的一组文件。

//1st time in a cell
val df = spark.read.csv("/FileStore/tables/count.txt")
df.write.mode("append").saveAsTable("tab2")
//1st time in another cell
val df2 = spark.sql("select * from tab2")
df2.count() 
//4 is returned

//2nd time in a different cell
val df = spark.read.csv("/FileStore/tables/count.txt")
df.write.mode("append").saveAsTable("tab2")
//2nd time in another cell
df2.count() 
//8 is returned

反驳你的断言。也尝试了.enableHiveSupport(),没有差异。

即使直接在Databricks:中创建配置单元表

spark.sql("CREATE TABLE tab5 (id INT, name STRING, age INT) STORED AS ORC;")
spark.sql(""" INSERT INTO tab5 VALUES (1, 'Amy Smith', 7) """)
...
df.count()
...
spark.sql(""" INSERT INTO tab5 VALUES (2, 'Amy SmithS', 77) """)
df.count()

仍然获取最新计数。

然而,对于Hive创建的ORC Serde表;蜂箱";方法或通过spark.sql:使用插入

val dfX = Seq((88,"John", 888)).toDF("id" ,"name", "age")
dfX.write.format("hive").mode("append").saveAsTable("tab5")
or
spark.sql(""" INSERT INTO tab5 VALUES (1, 'Amy Smith', 7) """)

当仅发出第二个CCD_ 5时,将有时显示并且有时不显示更新的计数。这是由于配置单元/Spark缺乏同步,这可能取决于更改的某些内部标记。在任何情况下都不一致。仔细检查。

在我看来,这与不可变量性最相关。DataFrames是不可变量的,因此原始表中的更改不会反映在它们上。

一旦对数据帧进行了评估,就再也不会对其进行计算。因此,一旦对名为df的数据帧进行了评估,它就是评估时表1的图片,表1是否更改也无所谓,df不会。因此,第二个df.count不会触发评估,它只是返回先前的结果,即2

如果你想要想要想要的结果,你必须在另一个变量中再次加载DF:

val df = spark.sql("select * from dummy.table1")
df.count() //Will trigger evaluation and return 2
//Insert record
val df2 = spark.sql("select * from dummy.table1")
df2.count() //Will trigger evaluation and return 3

或者使用var而不是val(这是坏的(

var df = spark.sql("select * from dummy.table1")
df.count() //Will trigger evaluation and return 2
//Insert record
df = spark.sql("select * from dummy.table1")
df.count() //Will trigger evaluation and return 3

这意味着:是的,spark-read和spark-sql是惰性的,在找到操作之前不会调用它们,但一旦发生这种情况,评估将不会在数据帧中再次触发

最新更新