我想使用我们的 Spark 集群并行运行程序。我的想法是像下面这样做:
def simulate():
#some magic happening in here
return 0
spark = (
SparkSession.builder
.appName('my_simulation')
.enableHiveSupport()
.getOrCreate())
sc = spark.sparkContext
no_parallel_instances = sc.parallelize(xrange(500))
res = no_parallel_instances.map(lambda row: simulate())
print res.collect()
我的问题是是否有办法使用不同的参数执行simulate()
。我目前能想象的唯一方法是有一个指定参数的数据帧,所以像这样:
parameter_list = [[5,2.3,3], [3,0.2,4]]
no_parallel_instances = sc.parallelize(parameter_list)
res = no_parallel_instances.map(lambda row: simulate(row))
print res.collect()
有没有另一种更优雅的方式来运行带有 Spark 的并行函数?
如果您要参数化呼叫的数据在每行之间不同,那么是的,您需要在每行中包含这些数据。
但是,如果要设置影响每一行的全局参数,则可以使用广播变量。
http://spark.apache.org/docs/latest/rdd-programming-guide.html#broadcast-variables
广播变量在脚本中创建一次,之后无法修改。 Spark 将有效地将这些值分发给每个执行程序,使其可用于转换。 要创建一个,您需要向 Spark 提供数据,它会为您提供一个句柄,您可以使用该句柄在执行器上访问它。 例如:
settings_bc = sc.broadcast({
'num_of_donkeys': 3,
'donkey_color': 'brown'
})
def simulate(settings, n):
# do magic
return n
no_parallel_instances = sc.parallelize(xrange(500))
res = no_parallel_instances.map(lambda row: simulate(settings_bc.value, row))
print res.collect()