SPARK SQL性能 - 在最小和最大之间的价值连接



我有两个文件:

  1. IP范围 - 国家查找
  2. 来自不同IP的请求列表

IPS作为整数存储(使用Inet_aton())。

我尝试使用Spark SQL将两个文件加载到数据范围并将其注册为临时表。

GeoLocTable - ipstart, ipend, ...additional Geo location data
Recordstable - INET_ATON, ...3 more fields

我尝试使用SPARK SQL使用SQL语句加入这些数据 -

"select a.*, b.* from Recordstable a left join GeoLocTable b on a.INET_ATON between b.ipstart and b.ipend"

Recordstable中约有850K记录,地球可观的记录约为250万。与约有20位执行人的加入约2小时。

我尝试过缓存和广播可寻求的地理位置,但似乎并没有帮助。我已经碰到spark.sql.autobroadcastjointhreshold = 300000000和spark.sql.shuffle.spartitions =600。

Spark UI显示了正在执行的广播开发室。这是我应该期望的最好的吗?我尝试搜索将执行这种类型的连接的条件,但文档似乎很少。

ps-我正在使用Pyspark与Spark一起使用。

问题的来源很简单。当您执行加入和加入条件时,不是基于平等的,Spark现在唯一能做的就是将其扩展到笛卡尔产品,然后过滤器BroadcastNestedLoopJoin内部发生的事情几乎是什么。因此,从逻辑上讲,您拥有这个巨大的嵌套环,可以测试所有850K * 250万记录。

这种方法显然极低效率。由于查找表看起来很适合内存,因此最简单的改进是使用本地,分类的数据结构而不是SPARK DataFrame。假设您的数据看起来像这样:

geo_loc_table = sc.parallelize([
    (1, 10, "foo"), (11, 36, "bar"), (37, 59, "baz"),
]).toDF(["ipstart", "ipend", "loc"])
records_table = sc.parallelize([
    (1,  11), (2, 38), (3, 50)
]).toDF(["id", "inet"])

我们可以通过ipstart进行项目和对参考数据进行分类,并创建广播变量:

geo_start_bd = sc.broadcast(geo_loc_table
  .select("ipstart")
  .orderBy("ipstart") 
  .flatMap(lambda x: x)
  .collect())

接下来,我们将使用UDF和Bisect模块来增强records_table

from bisect import bisect_right
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
# https://docs.python.org/3/library/bisect.html#searching-sorted-lists
def find_le(x):
    'Find rightmost value less than or equal to x'
    i = bisect_right(geo_start_bd.value, x)
    if i:
        return geo_start_bd.value[i-1]
    return None
records_table_with_ipstart = records_table.withColumn(
    "ipstart", udf(find_le, LongType())("inet")
)

最后加入两个数据集:

 records_table_with_ipstart.join(geo_loc_table, ["ipstart"], "left")

另一种可能性是使用Apache DataFu中的join_with_range API的Python版本进行加入。这将使您的范围爆炸成多行,因此Spark仍然可以进行等电量。

您需要使用以下参数调用Pyspark(从此处获取)。

export PYTHONPATH=datafu-spark_2.11-1.6.0.jar
pyspark --jars datafu-spark_2.11-1.6.0-SNAPSHOT.jar --conf spark.executorEnv.PYTHONPATH=datafu-spark_2.11-1.6.0-SNAPSHOT.jar

,然后您会像这样进行加入:

from pyspark_utils.df_utils import PySparkDFUtils
df_utils = PySparkDFUtils()
func_joinWithRange_res = df_utils.join_with_range(df_single=records_table,col_single="INET_ATON",df_range=geo_loc_table,col_range_start="ipstart",col_range_end="ipend",decrease_factor=10)
func_joinWithRange_res.registerTempTable("joinWithRange")

参数 10是最大程度地减少爆炸行的数量:它影响"存储量"的数量。创建。您可以进行此游戏以提高性能。

全面披露 - 我是datafu的成员。

相关内容

  • 没有找到相关文章

最新更新