我正在字符串中使用类内的concurrent.forets.ProcessPoolExecutitor((。但是负责调用方法operation的方法operation_data没有调用它;内部并发插入物";以及";做一些操作"没有打印出来。
import concurrent.futures
class ConcurrentTest:
def operation(self, chunk):
print("inside concurrent insert")
print("do some operations")
def operation_data(self):
chuck_list = [chunk for chunk in self.chunks([millions_of_data_in_this_list], 10000)]
print('chunk list done')
with concurrent.futures.ProcessPoolExecutor() as executor:
for c in chuck_list:
executor.submit(operation, c)
print("done")
@staticmethod
def chunks(l, n):
for i in range(0, len(l), n):
yield l[i:i + n]
concurrent_test = ConcurrentTest()
concurrent_test.operation_data()
我做错什么了吗?请提出建议。
提前感谢!
将executor.submit(operation, c)
更改为
executor.submit(self.operation, c)
或者使用Executor.map()
代替
executor.map(self.operation, chunk_list)
注意,使用map()
可以分块提交原始列表,通过直接传递大列表可以进一步简化代码
executor.map(self.operation, your_big_list, chunksize=n)
编辑:正如@Booboo在评论中所指出的,你不会得到并行化在提交后立即调用future.result((,因此我相应地编辑了我的答案。
EDIT2:这是一个完整的例子,executor.map删除了所有不必要的代码,并使用chunksize
参数
import concurrent.futures
class ConcurrentTest:
def operation(self, chunk):
print(f"{chunk} inside concurrent insert")
print(f"do some operations with {chunk}")
def operation_data(self):
with concurrent.futures.ProcessPoolExecutor() as executor:
executor.map(self.operation, range(20), chunksize=3)
print("done")
concurrent_test = ConcurrentTest()
concurrent_test.operation_data()
输出(您的可能不同(:
0 inside concurrent insert
do some operations with 0
1 inside concurrent insert
do some operations with 1
--- skipped for brevity ---
13 inside concurrent insert
do some operations with 13
14 inside concurrent insert
do some operations with 14
18 inside concurrent insert
do some operations with 18
19 inside concurrent insert
do some operations with 19
15 inside concurrent insert
11 inside concurrent insert
do some operations with 15
16 inside concurrent insert
do some operations with 16
do some operations with 11
17 inside concurrent insert
do some operations with 17
done