加快UDF访问数据湖的速度



我正在尝试编写一个UDF来用地理位置信息丰富IP列。我想要用于浓缩的数据以IP范围到国家的形式存储在数据湖中。为了读取文件,我使用Java API,但我发现这非常慢。例如,阅读一百万行需要2分钟以上。读取完整的文件需要几个小时,效率非常低。下面是我用来读取文件的代码:

from pyspark import SparkContext
from pyspark.sql import SparkSession
spark = (SparkSession.builder
.master(...).getOrCreate())
sc = spark._sc
hadoopConf = sc._jsc.hadoopConfiguration()
URI = sc._gateway.jvm.java.net.URI
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
BufferedReader = sc._gateway.jvm.java.io.BufferedReader
InputStreamReader = sc._gateway.jvm.java.io.InputStreamReader
datalake_file_system = '...'
account_name = '...'
fs = FileSystem.get(URI("abfss://{}@{}.dfs.core.windows.net".format(datalake_file_system, account_name)), hadoopConf)
file_path = Path('...')
f = fs.open(file_path)
r = BufferedReader(InputStreamReader(f))
while True:
line = r.readLine()
fields = line.split(',')
if fields[0].startswith('start_ip'):
continue
# Load IP range to country mapping in a map
...

是否有更快的方法来执行这样的操作?

我的UDF背后的想法是在地图中加载IP范围/地理信息,然后一旦地图加载到内存中,就在UDF中执行查找。在Spark中可能有其他更有效的方法来做到这一点。我想听听有没有更典型的方法来做这件事。原始信息在一个表中,我已经编写了代码来执行表之间的连接,但是由于合并涉及IP范围,直接连接的效率非常低。我已经使用了存储桶,它确实加快了速度,但我想看看使用UDF是否会更有效。

我发现解决这个问题的最佳解决方案是创建一个Java UDF,它在映射中执行延迟加载和查找。

最新更新