将产量输出传递给工作线程池时的 Python 类型错误.想要将大文件拆分为行块



下面的代码在 job.get(( 行返回一个奇怪的类型错误:

multiprocessing.pool.RemoteTraceback: """ Traceback (most recent call last):   File "C:Python36libmultiprocessingpool.py", line 119, in worker
result = (True, func(*args, **kwds))   File "G:emdmppurger.py", line 41, in process_wrapper
run(line)   File "G:emdmppurger.py", line 25, in run
if correct(copy):   File "G:emdmppurger.py", line 4, in correct
print('Not Equal to 14? ' + item) TypeError: must be str, not list """
job.get()
File "C:Python36libmultiprocessingpool.py", line 644, in get
raise self._value
TypeError: must be str, not list
TypeError "string not list".
File "g:EMDmppurger.py", line 76, in <module>
job.get()
File "C:Python36Libmultiprocessingpool.py", line 644, in get
raise self._value
builtins.TypeError: must be str, not list

唯一的列表必须是作业列表。但这是一个迭代器而不是迭代器?

我的假设是,这指的是我试图传递给process_wrapper函数的参数。

cores=16
pool = mp.Pool(cores)
jobs = []
#create jobjs
for chunkStart, chunkSize in chunkify("out.txt"):
jobs.append( pool.apply_async(process_wrapper,(chunkStart, chunkSize )))
#wait for all jobs to finish
for job in jobs:
job.get()
#clean up
pool.close()

我的生成 chunkStart chunkSize 的生成器函数如下所示:

def chunkify(fname,size=1024*1024):
fileEnd = os.path.getsize(fname)
with open(fname,'r') as f:
chunkEnd = f.tell()
while True:
chunkStart = chunkEnd
f.seek(chunkStart + size, 0)
f.readline()
chunkEnd = f.tell()
chunkSize = chunkEnd-chunkStart
yield chunkStart, chunkSize
if chunkEnd > fileEnd:
break

我将假设这是一个缩进错误或其他什么。我找不到错误,但在重写代码时,它消失了。

相关内容

最新更新