如何实现并行延迟,使并行化的for循环在输出低于阈值时停止?



假设我有以下代码:

from scipy import *
import multiprocessing as mp
num_cores = mp.cpu_count()
from joblib import Parallel, delayed
import matplotlib.pyplot as plt
def func(x,y):
return y/x
def main(y, xmin,xmax, dx):
x = arange(xmin,xmax,dx)
output = Parallel(n_jobs=num_cores)(delayed(func)(i, y) for i in x)
return x, asarray(output)
def demo():
x,z = main(2.,1.,30.,.1)
plt.plot(x,z, label='All values')
plt.plot(x[z>.1],z[z>.1], label='desired range') ## This is better to do in main()
plt.show()
demo()

我只想计算输出,直到输出>给定的数字(可以假设输出元素随着 x 的增加而单调减少(然后停止(不计算 x 的所有值然后排序,这对我的目的来说是低效的(。有没有办法使用并行、延迟或任何其他多处理来做到这一点?

没有指定output > a given number,所以我只是编了一个。 测试后,我不得不逆转 正确操作的条件output < a given number.

我会使用一个池,使用回调函数启动进程以检查停止条件,然后终止池 准备好时。但这会导致争用条件,该条件将允许从正在运行的进程中省略结果 不允许完成。我认为此方法对您的代码进行了最少的修改,并且非常易于阅读。这 不保证列表顺序。

优点:开销
很小 缺点:可能会缺少结果。

方法1(

from scipy import *
import multiprocessing
import matplotlib.pyplot as plt

def stop_condition_callback(ret):
output.append(ret)
if ret < stop_condition:
worker_pool.terminate()

def func(x, y, ):
return y / x

def main(y, xmin, xmax, dx):
x = arange(xmin, xmax, dx)
print("Number of calculations: %d" % (len(x)))
# add calculations to the pool
for i in x:
worker_pool.apply_async(func, (i, y,), callback=stop_condition_callback)
# wait for the pool to finish/terminate
worker_pool.close()
worker_pool.join()
print("Number of results: %d" % (len(output)))
return x, asarray(output)

def demo():
x, z_list = main(2., 1., 30., .1)
plt.plot(z_list, label='desired range')
plt.show()

output = []
stop_condition = 0.1
worker_pool = multiprocessing.Pool()
demo()

此方法具有更多开销,但将允许已开始完成的进程。 方法2(

from scipy import *
import multiprocessing
import matplotlib.pyplot as plt

def stop_condition_callback(ret):
if ret is not None:
if ret < stop_condition:
worker_stop.value = 1
else:
output.append(ret)

def func(x, y, ):
if worker_stop.value != 0:
return None
return y / x

def main(y, xmin, xmax, dx):
x = arange(xmin, xmax, dx)
print("Number of calculations: %d" % (len(x)))
# add calculations to the pool
for i in x:
worker_pool.apply_async(func, (i, y,), callback=stop_condition_callback)
# wait for the pool to finish/terminate
worker_pool.close()
worker_pool.join()
print("Number of results: %d" % (len(output)))
return x, asarray(output)

def demo():
x, z_list = main(2., 1., 30., .1)
plt.plot(z_list, label='desired range')
plt.show()

output = []
worker_stop = multiprocessing.Value('i', 0)
stop_condition = 0.1
worker_pool = multiprocessing.Pool()
demo()

方法3(优点: 不会遗漏
任何结果 缺点: 这超出了您通常所做的范围。

采用方法 1 并添加

def stopPoolButLetRunningTaskFinish(pool):
# Pool() shutdown new task from being started, by emptying the query all worker processes draw from
while pool._task_handler.is_alive() and pool._inqueue._reader.poll():
pool._inqueue._reader.recv()
# Send sentinels to all worker processes
for a in range(len(pool._pool)):
pool._inqueue.put(None)

然后更改stop_condition_callback

def stop_condition_callback(ret):
if ret[1] < stop_condition:
#worker_pool.terminate()
stopPoolButLetRunningTaskFinish(worker_pool)
else:
output.append(ret)

我会使用 Dask 并行执行,特别是用于在结果完成时实时反馈结果的 futures 接口。完成后,您可以取消剩余的飞行期货,租用不需要的期货以异步完成或关闭集群。

from dask.distributed import Client, as_completed
client = Client()  # defaults to ncores workers, one thread each
y, xmin, xmax, dx = 2.,1.,30.,.1
def func(x, y):
return x, y/x
x = arange(xmin,xmax,dx)
outx = []
output = []
futs = [client.submit(func, val, y) for val in x]
for future in as_completed(futs):
outs = future.result()
outx.append(outs[0])
output.append(outs[1])
if outs[1] < 0.1:
break

笔记: - 我假设你的意思是"小于",因为否则第一个值已经通过(y / xmin > 0.1( - 如果您想在结果准备就绪时获取结果,则不能保证输出按照您输入的顺序进行,但是通过如此快速的计算,也许它们总是如此(这就是为什么我让 func 也返回输入值的原因( - 如果您停止计算,输出将比全套输入短,所以我不太确定您要打印什么。

最新更新