python中异步函数中要调用的所有参数是什么


  1. 我有一个函数var。我想知道如何利用系统中所有的处理器、内核和RAM内存,通过多处理/并行处理快速运行该函数中的循环。

    import numpy as np
    from pysheds.grid import Grid
    xs = 82.1206, 80.8707, 80.8789, 80.8871, 80.88715
    ys = 25.2111, 16.01259, 16.01259, 16.01259, 15.9956
    a = r'/home/test/image1.tif'
    b = r'/home/test/image2.tif'
    def var(interest):
    variable_avg = []
    for (x,y) in zip(xs,ys):
    grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
    grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label') 
    grid.clip_to('catch')
    grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
    variablemask = grid.view('variable', nodata=np.nan)
    variablemask = np.array(variablemask)
    variablemean = np.nanmean(variablemask)
    variable_avg.append(variablemean)
    return(variable_avg)
    
  2. 如果对于给定的函数的多个参数,我可以并行地运行函数var和其中的循环,那将是非常棒的。例如:同时调用var(a)var(b)。由于它将比单独对多个坐标(xs,ys(并行化循环消耗更少的时间。

pysheds文档可在此处找到

grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')代码中使用的data.tif数据可以直接从此处下载。相同的数据可以用不同的名称复制到目录中,并在a = r'/home/test/image1.tif'的位置使用以及用于测试代码的CCD_ 9

为了加快上面的代码,我在这里得到了一个建议,如下所示:

def process_poi(interest, x, y):
grid = Grid.from_raster(interest, data_name='map')
grid.catchment(data='map', x=x, y=y, out_name='catch')
variable = grid.view('catch', nodata=np.nan)
variable = np.array(variable)
return variable.mean()
async def var_loop_async(interest, pool, loop):
tasks = []
for (x,y) in zip(xs,ys):
function_call = functools.partial(process_poi, interest, x, y)
tasks.append(loop.run_in_executor(pool, function_call))
return await asyncio.gather(*tasks)
async def main():
loop = asyncio.get_event_loop()
pool_start = time.time()
tasks = []
with ProcessPoolExecutor() as pool:
for _ in range(100):
tasks.append(var_loop_async(a, pool, loop))
results = await asyncio.gather(*tasks)
pool_end = time.time()
print(f'Process pool took {pool_end-pool_start}')
serial_start = time.time() 

但是,我不明白如何调用函数var_loop_async(interest, pool, loop)。事实上,我无法获得要调用什么参数来代替poolloop

我对python编程很陌生。

如果可能的话,请将上述建议作为一个可复制的解决方案,以便它可以直接在python中运行。或者,如果你有其他更好的建议来加快原始代码的速度,请告诉我。

首先,在您的原始代码中,我看到:

for (x,y) in zip(xs,ys):
grid = Grid.from_raster(interest, data_name='map')

我不熟悉pysheds模块,也找不到任何关于它的文档,所以我不知道Grid.from_raster是否是一个昂贵的操作。但这个语句似乎是在for循环之上移动而不是在循环中重新计算的候选者。也许仅此一项就可以获得显著的性能改进。链接,python中异步函数中要调用的所有参数是什么?,您提到的建议表明,创建进程池的开销可能不足以补偿这些麻烦。此外,如果Grid.from_raster是昂贵的,并且通过将其从循环中移除而获利,那么多处理解决方案本质上是";将其放回循环中";通过使它为每个x,y对执行,从而使多处理解决方案不太可能导致性能改进。

无论如何,要使用建议的技术运行代码,请参阅下面的内容。很遗憾,不能在处理器池中同时运行process_poivar_loop_async。但请在下面进一步查找不同的解决方案。

import numpy
from pysheds.grid import Grid
from concurrent.futures.process import ProcessPoolExecutor
import asyncio

xs = (82.1206, 72.4542, 65.0431, 83.8056, 35.6744)
ys = (25.2111, 17.9458, 13.8844, 10.0833, 24.8306)
file_list = (
r'/home/test/image1.tif',
r'/home/test/image2.tif'
)

def process_point(interest, x, y):
grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label')
grid.clip_to('catch')
grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
variablemask = grid.view('variable', nodata=np.nan)
variablemask = numpy.array(variablemask)
variablemean = np.nanmean(variablemask)
return variablemean

async def var_loop_async(interest, pool, loop):
tasks = [loop.run_in_executor(pool, process_point, interest, x, y) for (x, y) in zip(xs, ys)]
return await asyncio.gather(*tasks)

async def main():
loop = asyncio.get_event_loop()
with ProcessPoolExecutor() as pool:
tasks = [var_loop_async(file, pool, loop) for file in file_list]
results = await asyncio.gather(*tasks)
print(results)

if __name__ == "__main__":
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()

不同的解决方案

您希望能够在进程池中为要处理的每个文件运行var,然后在子进程中处理每个x,y对。这意味着您需要处理文件的每个子进程都有自己的进程池来处理x,y对。这通常是不可能的,因为为进程池创建的进程是守护进程进程(当主进程终止时,它们会自动终止(,并且不允许创建自己的子进程。为了克服这一点,我们必须创建自己的mutliprocessor.Pool专业化,并用自己的池初始化每个子进程。

但这会提高性能吗var子进程除了等待process_poi子进程完成它们的工作之外,基本上什么都不做。因此,我不希望这比以前的代码有太大的改进。而且,正如我所提到的,目前还不清楚这两种多处理解决方案是否会比原始代码有所改进,尤其是修改为重新定位Grid.from_raster调用的解决方案。

import numpy
from pysheds.grid import Grid
import functools
from multiprocessing.pool import Pool
import multiprocessing
import os
# This allows subprocesses to have their own pools:
class NoDaemonProcess(multiprocessing.Process):
# make 'daemon' attribute always return False
def _get_daemon(self):
return False
def _set_daemon(self, value):
pass
daemon = property(_get_daemon, _set_daemon)
class NoDaemonContext(type(multiprocessing.get_context())):
Process = NoDaemonProcess
class MyPool(multiprocessing.pool.Pool):
def __init__(self, *args, **kwargs):
kwargs['context'] = NoDaemonContext()
super(MyPool, self).__init__(*args, **kwargs)

xs = 82.1206, 72.4542, 65.0431, 83.8056, 35.6744
ys = 25.2111, 17.9458, 13.8844, 10.0833, 24.8306
a = r'/home/test/image1.tif'
b = r'/home/test/image2.tif'

pool2 = None
def init_pool():
global pool2
#pool2 = Pool(5)
pool2 = Pool(os.cpu_count // 2) # half the available number of processors

def process_poi(interest, x, y):
grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label')
grid.clip_to('catch')
grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
variablemask = grid.view('variable', nodata=np.nan)
variablemask = numpy.array(variablemask)
variablemean = np.nanmean(variablemask)
return variablemean

def var(interest):
task = functools.partial(process_poi, interest)
return pool2.starmap(task, zip(xs, ys))

def main():
# This will create non-daemon processes so that these processes can create their own pools:
with MyPool(2, init_pool) as pool:
results = pool.map(var, [a, b])
print(results)

if __name__ == "__main__":
main()

使用线程的第三种解决方案

使用asyncio:

import numpy
from pysheds.grid import Grid
from concurrent.futures import ThreadPoolExecutor
import asyncio
xs = (82.1206, 72.4542, 65.0431, 83.8056, 35.6744)
ys = (25.2111, 17.9458, 13.8844, 10.0833, 24.8306)
file_list = [
r'/home/test/image1.tif',
r'/home/test/image2.tif'
]
def process_point(interest, x, y):
grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label')
grid.clip_to('catch')
grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
variablemask = grid.view('variable', nodata=np.nan)
variablemask = numpy.array(variablemask)
variablemean = np.nanmean(variablemask)
return variablemean

async def var_loop_async(interest, pool, loop):
tasks = [loop.run_in_executor(pool, process_point, interest, x, y) for (x, y) in zip(xs, ys)]
return await asyncio.gather(*tasks)

async def main():
loop = asyncio.get_event_loop()
with ThreadPoolExecutor(max_workers=100) as pool:
tasks = [var_loop_async(file, pool, loop) for file in file_list]
results = await asyncio.gather(*tasks)
print(results)

if __name__ == "__main__":
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()

备选方案:

import numpy
from pysheds.grid import Grid
import functools
from concurrent.futures import ThreadPoolExecutor

xs = 82.1206, 72.4542, 65.0431, 83.8056, 35.6744
ys = 25.2111, 17.9458, 13.8844, 10.0833, 24.8306
a = r'/home/test/image1.tif'
b = r'/home/test/image2.tif'

def process_poi(interest, x, y):
grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label')
grid.clip_to('catch')
grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
variablemask = grid.view('variable', nodata=np.nan)
variablemask = numpy.array(variablemask)
variablemean = np.nanmean(variablemask)
return variablemean

def var(executor, interest):
return list(executor.map(functools.partial(process_poi, interest), xs, ys))

def main():
with ThreadPoolExecutor(max_workers=100) as executor:
results = list(executor.map(functools.partial(var, executor), [a, b]))
print(results)

if __name__ == "__main__":
main()

使用基于OP更新代码的线程的更新解决方案

import numpy
from pysheds.grid import Grid
import functools
from concurrent.futures import ThreadPoolExecutor

xs = (82.1206, 72.4542, 65.0431, 83.8056, 35.6744)
ys = (25.2111, 17.9458, 13.8844, 10.0833, 24.8306)
file_list = (
r'/home/test/image1.tif',
r'/home/test/image2.tif'
)

def process_point(interest, x, y):
grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label')
grid.clip_to('catch')
grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
variablemask = grid.view('variable', nodata=np.nan)
variablemask = numpy.array(variablemask)
variablemean = np.nanmean(variablemask)
return variablemean
def var(executor, interest):
return list(executor.map(functools.partial(process_point, interest), xs, ys))

def main():
with ThreadPoolExecutor(max_workers=100) as executor:
results = list(executor.map(functools.partial(var, executor), file_list))
print(results)

if __name__ == "__main__":
main()

相关内容

  • 没有找到相关文章

最新更新