Astroscrappy在多处理木星上不起作用



我这里有一些多处理代码,试图一次运行多个astroscrappy进程。然而,当它真正需要调用天体碰撞时,一切都停止了。我在jupyter笔记本上运行这个。

def a_test(i, q):
import astroscrappy
print(1)
path = i
s = fits.getdata(path)
print(2)
print(2.5)
a = astroscrappy.detect_cosmics(s)
print(3)
q.put([a, i])
bundle = []
import multiprocessing as mp
queue = mp.Manager().Queue()
processes = [] 
for k, item in enumerate(paths):
processes.append(mp.Process(target=a_test, args=(item, queue)))

# Run processes
for p in processes:
p.start()
for p in processes:
bundle.append(queue.get())

它只会打印出1,2,2.5,但不会在调用astroscrappy之后打印出3。知道为什么不行吗?

如果您查看astroscrappy的github: https://github.com/astropy/astroscrappy,您将看到它已经使用了自己的多进程,称为OpenMP。试图进一步对这类函数进行多进程处理将产生边际结果,或者根本没有任何结果。你最好的办法是保持你现有的速度,这应该是相当快的。

代码正在生成多个进程,必须给每个进程适当的时间来终止。为每个进程调用join()就是这样做的。我为3个文件测试了下面的代码,并且能够观察到进程的并发性。在我的机器上,执行时间是7-9秒。

import time
import astroscrappy
from astropy.io import fits
import multiprocessing as mp
bundle = []
processes = []
paths = [r"C:UsersxxxxFITSsample.fits", r"C:UsersxxxxFITSsample1.fits", r"C:UsersxxxxFITSsample2.fits"]
def a_test(i,q):    
print(1)
path = i
s = fits.getdata(path)
print(2)
print(2.5)
a = astroscrappy.detect_cosmics(s)
print(3)    
q.put([a, path])
if __name__ == '__main__':  
start = time.time()          
queue = mp.Manager().Queue()    
for item in paths:
processes.append(mp.Process(target=a_test, args=(item,queue)))
# # Run processes
for p in processes:
p.start()
for p in processes:        
bundle.append(queue.get())
#wait for child processes to finish
for p in processes:      
p.join()   

end = time.time()
print(f'Execution time: {end - start} seconds')

输出:

1
2
2.5
1
2
2.5
1
2
2.5
3
3
3
Execution time: 9.224327564239502 seconds

此代码不能在jupyter notebook中开箱运行。要了解为什么multiprocessing在jupyter notebook中不能像预期的那样工作,您可以参考这个讨论。

但是有一个解决方法。要使此代码在Jupyter notebook中工作,需要从单独的python文件调用a_test。例如,我创建了一个名为functest.py的python文件(在运行笔记本的同一目录中),代码如下:

import astroscrappy
from astropy.io import fits
def a_test(*args):        
path = args[0]   
s = fits.getdata(path)
a = astroscrappy.detect_cosmics(s)
return [a,path]

现在,在笔记本中运行下面的代码。请注意,我使用的是Pool而不是Process,输出将不会有a_test的打印语句,如1、2、2、5等。我故意从a_test中删除了它们,因为它们不会打印到Jupyter笔记本输出。相反,我打印出bundle来验证处理过程。

import time
import multiprocessing as mp
import functest as f
paths = [r"C:UsersxxxxFITSsample.fits", r"C:UsersxxxxFITSsample1.fits", r"C:UsersxxxxFITSsample2.fits"]
def main():
t1 = time.time()
pool_size = len(paths)
with mp.Pool(processes=pool_size) as pool:
bundle = pool.map(f.a_test, [item for item in paths]) 
print(bundle)
print(f"Execution time: {time.time() - t1} seconds")
if __name__ == '__main__':
main()

输出:

[[(array([[False, False, False, ..., False, False, False],
[False, False, False, ..., False, False, False],
[False, False, False, ..., False, False, False],
...,
[False, False, False, ..., False, False, False],
[False, False, False, ..., False, False, False],
[False, False, False, ..., False, False, False]]), array([[4.8158903, 4.808729 , 4.7792015, ..., 4.6927767, 4.7188225,
4.706318 ],
[4.765932 , 4.75....
Execution time: 8.333278179168701

multiprocessing的另一个替代方案是concurrent.futures模块,它在jupyter笔记本中运行没有任何问题。下面的代码可以在jupyter笔记本中运行。我能够将执行时间降低到5-6秒。

import time
import astroscrappy
from astropy.io import fits
from concurrent.futures import ThreadPoolExecutor
paths = [r"C:UsersxxxxFITSsample.fits", r"C:UsersxxxxFITSsample1.fits", r"C:UsersxxxxFITSsample2.fits"]
def a_test1(*args):    
print(1)
path = args[0]
s = fits.getdata(path)
print(2)
print(2.5)
a = astroscrappy.detect_cosmics(s)
print(3)
return [a, path]

def main():
t1 = time.time()
n_threads = len(paths)    
with ThreadPoolExecutor(n_threads) as executor:        
futures = [executor.submit(a_test1, item) for item in paths]
bundle = [future.result() for future in futures]
print(bundle)  
print(f"Execution time: {time.time() - t1}")
if __name__ == '__main__':  
main()

输出:

1
1
1
2
2.5
2
2.5
2
2.5
3
3
3
[[(array([[False, False, False, ..., False, False, False],
[False, False, False, ..., False, False, False],
[False, False, False, ..., False, False, False],
...,
[False, False, False, ..., False, False, False],
[False, False, False, ..., False, False, False],
[False, False, False, ..., False, False, False]]), array([[4.8158903, 4.808729 , 4.7792015, ..., 4.6927767, 4.7188225,
4.706318 ]....
Execution time: 5.936185836791992

另一个选择是使用threading模块。这里有一个简单的例子,可以在木星上运行。我得到的执行时间大约是5-6秒。

import time
import astroscrappy
from astropy.io import fits
from threading import Thread
paths = [r"C:UsersxxxxFITSsample.fits", r"C:UsersxxxxFITSsample1.fits", r"C:UsersxxxxFITSsample2.fits"]
thread_objs = []
def a_test(i):    
print(1)
path = i   
s = fits.getdata(path)
print(2)
print(2.5)
a = astroscrappy.detect_cosmics(s)
print(3)    

def main():
t1 = time.time()

for item in paths:
thread_objs.append(Thread(target=a_test, args=(item,)))
# run each thread
for thread in thread_objs:
thread.start()

# wait for each thread to finish
for thread in thread_objs:
thread.join() 
print(f"Execution time: {time.time() - t1}")
main()

输出:

1
1
1
2
2.5
2
2.5
2
2.5
3
3
3
Execution time: 6.320343971252441

注意,如果您不需要在该函数之外处理a_test的结果,那么您不需要从中返回任何东西。这将进一步节省时间。

我还用joblib进行了一些测试。我重构了你的代码,并在下面分享了一些测试结果:

from joblib import Parallel, delayed
import astroscrappy
from astropy.io import fits
paths = [r"C:UsersxxxxxFITSsample.fits", r"C:UsersxxxxxFITSsample1.fits", r"C:UsersxxxxFITSsample2.fits"]
def a_test(i):    
print(1)
path = i   
s = fits.getdata(path)
print(2)
print(2.5)
a = astroscrappy.detect_cosmics(s)
print(3) 
return ([a,i])

def main():
t1 = time.time()
a = Parallel(n_jobs=len(paths))(delayed(a_test)(i) for i in paths)    
print(f"Execution time: {time.time() - t1}")
main()

输出:

Execution time: 6.360047101974487

打印语句输出在jupyter notebook输出中没有出现,但执行时间在6-7秒之间。

总之,我没有观察到任何方法的执行时间有显著的时间差异。这也可能是因为我尝试了一个小数据集(只有3个文件)。然而,concurrent.futures始终表现出稍好的结果。您可以尝试所有这些方法,并比较哪一个最适合您的用例。

使用joblib的Parallel,我能够使这段代码工作得更快而不会卡住。

def a_test_parallel(i):
import astroscrappy
print(1)
path = i
s = fits.getdata(path)
print(2)
print(2.5)
a = astroscrappy.detect_cosmics(s)
print(3)
return ([a,i])
a = Parallel(n_jobs=15)(delayed(a_test_parallel)(i) for i in paths[:100])

我运行了几次,与不使用并行的代码进行比较。它的运行速度几乎是它的两倍。仍然不确定为什么多处理不能工作,但至少这个可以。

编辑:在一个更大的数据集上运行它并进行一些额外的调整后,这里的代码实际上运行速度没有两倍那么快。它实际上比同步代码慢一点。

相关内容

  • 没有找到相关文章

最新更新