使用列表/元组过滤数据帧时spark广播变量的相关性



假设我有一个用例,我需要使用python列表中的值来子集spark数据帧。我不明白如果我使用普通python列表会有什么不同广播变量来完成这个任务。

,

假设我有一个数据帧df有两列
df = spark.createDataFrame([('1','A'),('2','B'),('3','C'),('4','D')],["num","alpha"])
df.show(5)
+---+-----+
|num|alpha|
+---+-----+
|  1|    A|
|  2|    B|
|  3|    C|
|  4|    D|
+---+-----+

我想根据值列表['A','B']过滤它。这个列表可以非常小也可以非常大。我可以通过两种方式做到这一点。

方法1:

使用python列表过滤

from pyspark.sql.functions import col
list_filter = ['A','B']
df.filter(col("alpha").isin(list_filter)).show(5)

方法2:

使用spark广播变量过滤

from pyspark.sql.functions import col
broadcast_filter = sc.broadcast(['A','B'])
df.filter(col("alpha").isin(broadcast_filter.value)).show(5)

结果:

Spark catalyst optimizer将这两个查询转换为相同的物理计划

== Physical Plan ==
*(1) Filter alpha#784 IN (A,B)
+- Scan ExistingRDD[num#783,alpha#784]

事实上,type(broadcast_filter.value)告诉我们这是一个正常的python列表。

催化剂优化器直接在== Physical Plan ==中解析该列表的值并创建DAG。所以,只有驱动程序处理这个列表。执行器甚至不需要使用它,因为这些值已经存在于查询计划中。

问题:

  1. 那么,如果它是一个普通的python列表或广播变量,因为这些值直接用于物理计划,这真的很重要吗?依我看,与使用python列表相比,广播会降低性能。
  2. 除了自定义udf之外,是否存在广播变量比普通变量更能提高查询性能的情况?

广播变量允许程序员在每台机器上缓存一个只读变量,而不是在任务中附带它的副本。[…Spark还尝试使用高效的广播算法来分发广播变量,以降低通信成本。

在Spark中,Task(又名命令)是对应于RDD分区的最小的单个执行单元。

有了rdd,事情就清楚了。

rdd = sc.parallelize(range(10))
a_big_list = [1, 5]
result = rdd.map(lambda x : (x, x in a_big_list)).collect
a_big_list_br = sc.boradcast(a_big_list)
result = rdd.map(lambda x : (x, x in a_big_list.value)).collect

两种方法都有效。在第一种情况下,列表被传送到每个任务(即每个分区)。如果变量和RDD很大,那么这可能会耗费时间。在第二种情况下,变量只发送一次到每台机器(计算许多任务),使用优化的分配算法,因此它应该快得多。

现在,回到数据框架和你的问题:

  1. 简短的回答,不,没关系。为什么呢?在编写isin(list)时,实际上是将整个列表放在SparkSQL查询中。因此,Catalyst将把它包含在其物理计划中(在执行任何操作之前),尝试优化该计划,然后运行它。你的执行者不会直接使用这个变量。相反,信息将来自查询本身。因此,广播在这种情况下是完全无用的。当你做你要做的事情时,是驱动程序读取广播变量并将其提供给catalyst,这与普通变量发生的方式完全相同。还要注意,catalyst并不意味着包含数据。试试isin(list)和一个大列表和一个小数据框(spark.range(10))。您将看到,spark甚至需要花费很长时间才能尝试执行您的查询。现在是催化剂建立一个不必要的大型物理计划的时候了。如果列表足够大,催化剂甚至可能崩溃。

  2. 当您在udf中使用变量时,您又回到了前面提到的RDD情况。用一个大的列表试试(如下所示),你会发现它比在物理计划中传递要好得多。此外,广播将产生与rdd相同的影响。在SparkSQL中,udf是广播变量的唯一方法。任何其他的SparkSQL函数都会把你的数据放到物理计划中,你不会希望这样的。

# no broadcast, list shipped to each task
fun = udf(lambda x: x in a_big_list)
spark.range(10).withColumn("x", fun(x)).show(false)
# broadcast, list shipped only once to each machine
fun_br = udf(lambda x: x in a_big_list.value)
spark.range(10).withColumn("x", fun_br(x)).show(false)

最后,请注意,另一种方法是将数据放在使用join的数据框中,无论是否使用广播(对于数据框,使用pyspark.sql.functions.broadcast)

相关内容

  • 没有找到相关文章

最新更新