带有迭代器的多处理池



我想将多处理池与迭代器一起使用,以便在线程中执行一个函数,将迭代器拆分为 N 个元素,直到迭代器完成。

import arcpy
from multiprocessing import Pool
def insert(rows):
with arcpy.da.InsertCursor("c:temp2.gdbtest" fields=["*"]) as i_cursor:
#i_cursor is an iterator
for row in rows:
i_cursor.insertRow(row)
input_rows = []
count = 0
pool = Pool(4)
with arcpy.da.SearchCursor("c:temp.gdbtest", fields=["*"]) as s_cursor:
#s_cursor is an iterator
for row in s_cursor:
if (count < 100):
input_rows.append(row)
count += 1
else:
#send 100 rows to the insert function in a new thread
pool.apply_async(insert, input_rows)
#reset count and input_rows
count = 1
input_rows = [row]

pool.join()
pool.close()

我的问题,这个脚本是正确的方法吗?有没有更好的方法?

该脚本可能有问题,因为我在pool.join()

Traceback (most recent call last):
File "G:Maximetruncate_append_pool.py", line 50, in <module>
pool.join()
File "C:AppPython27ArcGIS10.3libmultiprocessingpool.py", line 460, in join
assert self._state in (CLOSE, TERMINATE)
AssertionError

如果我必须猜测您的代码主要出了什么问题,我会说这是将您的input_rows传递给您的进程函数insert()-multiprocessing.Pool.apply_async()的工作方式是解压缩传递给它的参数,因此您的insert()函数实际上检索100参数,而不是一个带有100元素列表的参数。这会导致在您的过程函数有机会启动之前立即出错。如果您将呼叫更改为pool.apply_async(insert, [input_rows])它可能会开始工作...您还将破坏迭代器的目的,并且您可能会将整个输入迭代器转换为列表,并将100切片提供给multiprocessing.Pool.map()并完成它。

但是你问是否有"更好"的方法可以做到这一点。虽然"更好"是一个相对类别,但在理想世界中,multiprocessing.Pool附带了一个方便的imap()(和imap_unordered())方法,旨在使用可迭代对象并以懒惰的方式将它们分布在选定的池中(因此在处理之前不会运行整个迭代器),因此您只需要构建迭代器切片并将其传递给它进行处理, 即:

import arcpy
import itertools
import multiprocessing
# a utility function to get us a slice of an iterator, as an iterator
# when working with iterators maximum lazyness is preferred 
def iterator_slice(iterator, length):
iterator = iter(iterator)
while True:
res = tuple(itertools.islice(iterator, length))
if not res:
break
yield res
def insert(rows):
with arcpy.da.InsertCursor("c:temp2.gdbtest" fields=["*"]) as i_cursor:
for row in rows:
i_cursor.insertRow(row)
if __name__ == "__main__":  # guard for multi-platform use
with arcpy.da.SearchCursor("c:temp.gdbtest", fields=["*"]) as s_cursor:
pool = multiprocessing.Pool(processes=4)  # lets use 4 workers
for result in pool.imap_unordered(insert, iterator_slice(s_cursor, 100)):
pass  # do whatever you want with your result (return from your process function)
pool.close()  # all done, close cleanly

(顺便说一句,您的代码不会为您提供所有不是 100 倍数的s_cursor大小的最后一个切片)

但。。。如果它真的像宣传的那样工作,那就太好了。虽然多年来已经修复了很多问题,但在生成自己的迭代器时,imap_unordered()仍然会从迭代器中获取大量样本(远远大于实际池进程的数量),所以如果这是一个问题,你将不得不下来弄脏自己,并且你走在正确的轨道上 - 当你想控制如何喂养你的池时,apply_async()是要走的路, 您只需要确保正确喂养游泳池:

if __name__ == "__main__":
with arcpy.da.SearchCursor("c:temp.gdbtest", fields=["*"]) as s_cursor:
pool = multiprocessing.Pool(processes=4)  # lets use 4 workers
cursor_iterator = iterator_slice(s_cursor, 100)  # slicer from above, for convinience
queue = []  # a queue for our current worker async results, a deque would be faster
while cursor_iterator or queue:  # while we have anything to do...
try:
# add our next slice to the pool:
queue.append(pool.apply_async(insert, [next(cursor_iterator)])) 
except (StopIteration, TypeError):  # no more data, clear out the slice iterator
cursor_iterator = None
# wait for a free worker or until all remaining finish
while queue and (len(queue) >= pool._processes or not cursor_iterator):
process = queue.pop(0)  # grab a process response from the top
process.wait(0.1)  # let it breathe a little, 100ms should be enough
if not process.ready():  # a sub-process has not finished execution
queue.append(process)  # add it back to the queue
else:
# you can use process.get() to get the result if needed
pass
pool.close()

现在,只有当需要接下来的 100 个结果时(当您的insert()过程函数是否完全退出时),才会调用您的s_cursor迭代器。

更新- 如果需要捕获的结果,之前发布的代码在最后关闭队列时有一个错误,这个应该可以很好地完成这项工作。我们可以使用一些模拟函数轻松测试它:

import random
import time
# just an example generator to prove lazy access by printing when it generates
def get_counter(limit=100):
for i in range(limit):
if not i % 3:  # print every third generation to reduce verbosity
print("Generated: {}".format(i))
yield i
# our process function, just prints what's passed to it and waits for 1-6 seconds
def test_process(values):
time_to_wait = 1 + random.random() * 5
print("Processing: {}, waiting: {:0.2f} seconds".format(values, time_to_wait))
time.sleep(time_to_wait)
print("Processed: {}".format(values))

现在我们可以像这样将它们交织在一起:

if __name__ == "__main__":
pool = multiprocessing.Pool(processes=2)  # lets use just 2 workers
count = get_counter(30)  # get our counter iterator set to iterate from 0-29
count_iterator = iterator_slice(count, 7)  # we'll process them in chunks of 7
queue = []  # a queue for our current worker async results, a deque would be faster
while count_iterator or queue:
try:
# add our next slice to the pool:
queue.append(pool.apply_async(test_process, [next(count_iterator)]))
except (StopIteration, TypeError):  # no more data, clear out the slice iterator
count_iterator = None
# wait for a free worker or until all remaining workers finish
while queue and (len(queue) >= pool._processes or not count_iterator):
process = queue.pop(0)  # grab a process response from the top
process.wait(0.1)  # let it breathe a little, 100ms should be enough
if not process.ready():  # a sub-process has not finished execution
queue.append(process)  # add it back to the queue
else:
# you can use process.get() to get the result if needed
pass
pool.close()

结果是(当然,它会因系统而异):

Generated: 0
Generated: 3
Generated: 6
Generated: 9
Generated: 12
Processing: (0, 1, 2, 3, 4, 5, 6), waiting: 3.32 seconds
Processing: (7, 8, 9, 10, 11, 12, 13), waiting: 2.37 seconds
Processed: (7, 8, 9, 10, 11, 12, 13)
Generated: 15
Generated: 18
Processing: (14, 15, 16, 17, 18, 19, 20), waiting: 1.85 seconds
Processed: (0, 1, 2, 3, 4, 5, 6)
Generated: 21
Generated: 24
Generated: 27
Processing: (21, 22, 23, 24, 25, 26, 27), waiting: 2.55 seconds
Processed: (14, 15, 16, 17, 18, 19, 20)
Processing: (28, 29), waiting: 3.14 seconds
Processed: (21, 22, 23, 24, 25, 26, 27)
Processed: (28, 29)

证明我们的生成器/迭代器仅在池中有一个空闲插槽来执行工作时才用于收集数据,以确保最小的内存使用量(和/或 I/O 冲击,如果您的迭代器最终这样做)。你不会得到比这更精简的。您可以获得的唯一额外的(尽管微不足道)速度是减少等待时间(但您的主进程随后会消耗更多资源)并增加允许的queue大小(以牺牲内存为代价),该大小被锁定在上述代码中的进程数 - 如果您使用while queue and (len(queue) >= pool._processes + 3 or not count_iterator):它将再加载 3 个迭代器切片,确保在进程结束和插槽的情况下延迟更少池子腾出来了。

相关内容

  • 没有找到相关文章

最新更新