对象是不可迭代的多处理?



我一直得到不可调用的列表上的类型错误,尽管我在终端上收到输出的打印…在循环中调用list是什么?

def work(page):  
#-------------------------
#make obj of page and do something
grabthis = Some_class1(page)
f = Someclass_2(grabthis,page)
output = f.extract()
print(output)

pages='PDFPAGES'            

#set page
save = []
for page in pages:
go = work(page) 
start = multiprocessing.Process(target=go)
start.start()
save.append(start)
if go == 'norun':
continue

for items in save:
start.join()
Traceback (most recent call last):
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/process.py", line 267, in _bootstrap
self.run()
File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
TypeError: 'list' object is not callable

通过多处理或线程迭代一堆文件的正确方法是什么?

请参阅Michael Butscher发布的评论,因为您的意图很可能是并发(多线程)或并行(多处理)功能work进程页面。不同之处在于,在多线程中,每个线程必须在运行Python代码之前获得全局解释器锁(GIL),因此不会有两个线程同时(并行)执行Python代码。如果work(1)主要是I/O绑定并在等待I/O(或网络请求)完成时释放GIL,那么大多数时间它只是等待或(2)您正在执行释放GIL的C/c++代码(某些库模块用于实现),则这不是太大的问题。否则,您需要进行cpu密集型处理,在这种情况下,多处理是可行的方法。然而,多处理具有串行处理所没有的额外开销,即创建进程和跨进程(不同的地址空间)移动数据。因此,除非work是显著的cpu密集型,否则多处理解决方案将比串行解决方案运行得更慢。

让我们假设work是这样的,多处理是正确的方法。假设除了高CPU需求之外,还涉及到相当数量的等待。然后,创建比CPU内核更多的进程可能是有利的,因为进程将不时地进入等待状态,并允许其他进程运行。但是,如果只涉及很少或根本不涉及I/O处理,那么创建比CPU内核更多的进程将一无所获。让我们假设后者,让N是您拥有的CPU核数,M是您必须处理的页面数。如果M是<=N,那么您可以为每个页面创建一个进程,因为您似乎没有从work返回一个值(但使用多处理池可能更简单):

from multiprocessing import Process
def work(page):  
#-------------------------
#make obj of page and do something
grabthis = Some_class1(page)
f = Someclass_2(grabthis,page)
output = f.extract()
print(output)

pages='PDFPAGES'            
# Required for Windows or any platform that uses the *spawn* method to
# create new processes:
if __name__ == '__main__':            
processes = []
for page in pages:
p = multiprocessing.Process(target=go, args=(page,))
p.start()
processes.append(p)

for p in processes:
p.join()

但如果M>P,即你有更多的页面处理比你的CPU核心,或者如果work需要返回一个结果回主进程,然后我会使用一个多处理池,这也是合适的,即使M<P:

from multiprocessing import Pool, cpu_count
def work(page):  
#-------------------------
#make obj of page and do something
grabthis = Some_class1(page)
f = Someclass_2(grabthis,page)
output = f.extract()
print(output)

pages='PDFPAGES'            
# Required for Windows or any platform that uses the *spawn* method to
# create new processes:
if __name__ == '__main__':            
# This will create a pool whose size is never more than the number of
# CPU cores you have or the number of pages you have to process:
pool_size = min(cpu_count(), len(pages))
pool = Pool()
pool.map(go, pages) # or results = pool.map(go, pages) if `work` returns something
# Cleanup pool:
pool.close()
pool.join()

但是如果work主要是I/O绑定,那么使用多线程池。池大小可以相当大,但您仍然应该将其保持在合理的大小(200?):

from multiprocessing.pool import ThreadPool
def work(page):  
#-------------------------
#make obj of page and do something
grabthis = Some_class1(page)
f = Someclass_2(grabthis,page)
output = f.extract()
print(output)
pages='PDFPAGES'            
if __name__ == '__main__':            
# This will create a pool whose size is never more than 200 or
# the number of pages you have to process:
pool_size = min(200, len(pages))
pool = ThreadPool(pool_size)
pool.map(go, pages) # or results = pool.map(go, pages) if `work` returns something
# Cleanup pool:
pool.close()
pool.join()

注意

以上只是概括。但是,如果您的work函数是迭代文件,那么多线程可能是最好的方法。但是您的磁盘有一个最大数据速率,因此创建更多的线程将无助于提高性能。此外,如果没有固态驱动器,那么并发读取多个文件所导致的额外磁头移动可能会损害性能,并且两个线程的运行速度可能比串行方法慢。您可以从池大小为2开始,看看它是否能提高性能,然后慢慢增加池大小。唯一的问题是,您的操作系统可能会缓存磁盘数据,因此当您使用不同池大小重新运行代码时,由于缓存,它将运行得更快。您要么需要找到一种方法在运行之间清除磁盘缓存,要么在运行之间重新启动。

最新更新