网络绑定转换和线程



我正在尝试使用REST API来丰富我在spark数据帧中的数据。REST API不是由我构建的,并且一次只需要一个输入(没有批处理选项(。不幸的是,REST API延迟比我希望的要慢,所以我的spark应用程序似乎要花很多时间等待API在每行上迭代。尽管我的REST API具有更高的延迟,但它确实具有非常高的吞吐量/容量,这似乎并没有被我的spark应用程序完全使用。

由于我的应用程序似乎是网络绑定的,我想知道使用线程来帮助提高应用程序的速度是否有意义。spark内部已经能够做到这一点了吗?如果使用线程确实有意义,有没有一种简单的方法可以实现这一点?有人成功地做到了吗?

我在从blob存储中获取数据时遇到了同样的问题。下面是一个小型的自包含的虚拟示例,我认为您可以很容易地根据需要进行修改。在该示例中,您应该能够注册构造df_slow比构造df_fast花费更长的时间。它的工作原理是让每个工作者并行处理一个行列表,而不是一次按顺序处理一行。您可以将slowAdd函数与自己的Row转换函数进行交换。slowAdd函数通过睡眠0.1秒来模拟网络延迟。

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import Row
# Just some dataframe with numbers
data = [(i,) for i in range(0, 1000)]
df = spark.createDataFrame(data, ["Data"], T.IntegerType())
# Get an rdd that contains 'list of Rows' instead of 'Row'
standardRdd =  df.rdd # contains [row1, row3, row3,...]
number_of_partitions = 10
repartionedRdd = standardRdd.repartition(number_of_partitions) # contains [row1, row2, row3,...] but repartioned to increase parallelism 
glomRdd = repartionedRdd.glom() # contains roughly [[row1, row2, row3,..., row100], [row101, row102, row103, ...], ...] 
# where the number of sublists corresponds to the number of partitions

# Define a transformation function with an artificial delay.
# Substitute this with your own transformation function.
import time
def slowAdd(r):
d = r.asDict()
d["Data"] = d["Data"] + 100
time.sleep(0.1)
return Row(**d)
# Define a function that maps the slowAdd function from 'list of Rows' to 'list of Rows' in parallel
import concurrent.futures
def slowAdd_with_thread_pool(list_of_rows):
thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=100)
return [result for result in thread_pool.map(slowAdd, list_of_rows)]
# Perform a fast mapping from 'list of Rows' to 'Rows'.
transformed_fast_rdd = glomRdd.flatMap(slowAdd_with_thread_pool)
# For reference, perform a slow mapping from 'Rows' to 'Rows'
transformed_slow_rdd = repartionedRdd.map(slowAdd)
# Convert the rdds back to dataframes from the rdd's
df_fast = spark.createDataFrame(transformed_fast_rdd)
#This sum operation will be fast (~100 threads sleeping in parallel on each worker) 
df_fast.agg(F.sum(F.col("Data"))).show()
df_slow = spark.createDataFrame(transformed_slow_rdd)
#This sum operation will be slow (1 thread sleeping in parallel on each worker) 
df_slow.agg(F.sum(F.col("Data"))).show()

最新更新