我想通过使用N-Gram来提高我的speech2text模型的准确性。因此,我使用这行代码将函数应用于整个数据集,如下所示:
result = dataset.map(predict, batch_size=5, num_proc=int(os.environ.get('cpu_core')))
我为'cpu_core'
设置的CPU核心是8。
这是predict
的功能代码:
def predict(batch):
batch["predicted"] = processor.batch_decode(np.array(batch["logits"])).text[0]
print(batch["predicted"])
return batch
我在try
块中使用这一行,它在while True
循环中,当程序将面临多进程错误时,它将卡在while true
循环中。这是完整的代码:
while True:
try:
dataset = dataset.map(speech_file_to_array_fn)
# If we're using n-gram
if os.environ.get('active_ngram') == '1':
dataset = dataset.map(predict_model)
print("nN-Gram startedn")
result = dataset.map(predict, batch_size=5, num_proc=int(os.environ.get('cpu_core'))) # This is the line that occurs the error
except KeyboardInterrupt:
print('interrupted!')
break
except:
pass
现在我想知道如何处理这个多进程错误。(python 3.8.10和ubuntu 20.04.4)
这是错误:
^C进程ForkPoolWorker-3335:█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████过程ForkPoolWorker-3330:██████████████████████████████████████████████████████████████████████████████████████████████████████|3/3[02:08<00:00,37.41s/ex]流程ForkPoolWorker-19:流程ForkPoolWorker-3333:流程ForkPoolWorker-16:流程ForkPoolWorker-21:流程ForkPoolWorker-13:流程ForkPoolWorker-15:流程ForkPoolWorker-12:流程ForkPoolWorker-14:流程ForkPoolWorker-3336:流程ForkPoolWorker-3331:流程ForkPoolWorker-3334:流程ForkPoolWorker-3332:流程ForkPoolWorker-18:流程ForkPoolWorker-17:#0:25%|██████████████████████████████████████████████████████████████████████████████▌|1/4[14:09:32<42:28:3850972.67s/ex]过程ForkPoolWorker-20:Traceback(最近一次通话):文件"/usr/lib/python3.8/multiprocessing/process.py";,第315行,in_引导程序self.run()文件"/usr/lib/python3.8/multiprocessing/process.py";,线路108,运行中自我_target(*self._args,**self._kwargs)文件"/usr/lib/python3.8/multiprocessing/pool.py";,114线,工人task=get()文件"usr/lib/python3.8/multiprocessing/queues.py";,355行,在get中自我_rlock:文件//usr/lib/python3.8/multiprocessing/synchronize.py";,第95行,in输入返回self_semlock输入()KeyboardInterrupt Traceback(最后一次调用):文件"/usr/lib/python3.8/multiprocessing/process.py";,第315行,in_引导程序self.run()文件"/usr/lib/python3.8/multiprocessing/process.py";,线路108,运行中自我_target(*self._args,**self._kwargs)文件"/usr/lib/python3.8/multiprocessing/pool.py";,114线,工人task=get()文件"usr/lib/python3.8/multiprocessing/queues.py";,355行,在get中自我_rlock:文件//usr/lib/python3.8/multiprocessing/synchronize.py";,第95行,in输入返回self_semlock输入()KeyboardInterrupt Traceback(最后一次调用):文件"/usr/lib/python3.8/multiprocessing/process.py";,第315行,in_引导程序self.run()文件"/usr/lib/python3.8/multiprocessing/process.py";,线路108,运行中自我_target(*self._args,**self._kwargs)文件"/usr/lib/python3.8/multiprocessing/pool.py";,114线,工人task=get()文件"usr/lib/python3.8/multiprocessing/queues.py";,355行,在get中自我_rlock:文件//usr/lib/python3.8/multiprocessing/synchronize.py";,第95行,in输入返回self_semlock输入()KeyboardInterrupt Traceback(最后一次调用):文件"/usr/lib/python3.8/multiprocessing/process.py";,第315行,in_引导程序self.run()文件"/usr/lib/python3.8/multiprocessing/process.py";,线路108,运行中自我_target(*self._args,**self._kwargs)文件"/usr/lib/python3.8/multiprocessing/pool.py";,114线,工人task=get()文件"usr/lib/python3.8/multiprocessing/queues.py";,355行,在get中自我_rlock:文件//usr/lib/python3.8/multiprocessing/synchronize.py";,第95行,in输入返回self_semlock输入()KeyboardInterrupt Traceback(最后一次调用):文件"/usr/lib/python3.8/multiprocessing/process.py";,第315行,in_引导程序self.run()文件"/usr/lib/python3.8/multiprocessing/process.py";,线路108,运行中自我_target(*self._args,**self._kwargs)文件"/usr/lib/python3.8/multiprocessing/pool.py";,114线,工人task=get()文件"usr/lib/python3.8/multiprocessing/queues.py";,355行,在get中自我_rlock:文件//usr/lib/python3.8/multiprocessing/synchronize.py";,第95行,in输入返回self_semlock输入()KeyboardInterrupt Traceback(最后一次调用):文件"/usr/lib/python3.8/multiprocessing/process.py";,第315行,in_引导程序self.run()文件"/usr/lib/python3.8/multiprocessing/process.py";,线路108,运行中自我_target(*self._args,**self._kwargs)文件"/usr/lib/python3.8/multiprocessing/pool.py";,114线,工人task=get()文件"usr/lib/python3.8/multiprocessing/queues.py";,355行,在get中自我_rlock:文件//usr/lib/python3.8/multiprocessing/synchronize.py";,第95行,in输入返回self_semlock输入()KeyboardInterrupt Traceback(最后一次调用):文件"/usr/lib/python3.8/multiprocessing/process.py";,第315行,in_引导程序self.run()文件"/usr/lib/python3.8/multiprocessing/process.py";,线路108,运行中自我_target(*self._args,**self._kwargs)文件"/usr/lib/python3.8/multiprocessing/pool.py";,114线,工人task=get()文件"usr/lib/python3.8/multiprocessing/queues.py";,356行,在get中res=自我_reader.recv_bytes()文件"/usr/lib/python3.8/multiprocessing/connection.py";,第216行,inrecv_bytesbuf=自我_recv_bytes(maxlength)文件"/usr/lib/python3.8/multiprocessing/connection.py";,第414行,在_recv_bytesbuf=自我_recv(4)文件"//usr/lib/python3.8/multiprocessing/connection.py";,第379行,在_recv中chunk=读取(句柄,剩余)KeyboardInterrupt Traceback(最后一次调用):Traceback(最近一次调用最后一次):File"/usr/lib/python3.8/multiprocessing/process.py";,第315行,in_引导程序self.run()文件"/usr/lib/python3.8/multiprocessing/process.py";,线路108,运行中自我_target(*self._args,**self._kwargs)文件"/usr/lib/python3.8/multiprocessing/pool.py";,114线,工人task=get()文件"usr/lib/python3.8/multiprocessing/queues.py";,355行,在get中自我_rlock:文件//usr/lib/python3.8/multiprocessing/synchronize.py";,第95行,in输入返回self_semlock输入()KeyboardInterrupt Traceback(最后一次调用):文件"/usr/lib/python3.8/multiprocessing/process.py";,第315行,in_引导程序self.run()文件"/usr/lib/python3.8/multiprocessing/process.py";,线路108,运行中自我_target(*self._args,**self._kwargs)文件"/usr/lib/python3.8/multiprocessing/pool.py";,114线,工人task=get()文件"usr/lib/python3.8/multiprocessing/queues.py";,355行,在get中自我_rlock:文件//usr/lib/python3.8/multiprocessing/synchronize.py";,第95行,in输入返回self_semlock输入()KeyboardInterrupt File"//usr/lib/python3.8/multiprocessing/process.py";,第315行,in_引导程序self.run()文件"/usr/lib/python3.8/multiprocessing/process.py";,线路108,运行中自我_target(*self._args,**self._kwargs)文件"/usr/lib/python3.8/multiprocessing/pool.py";,114线,工人task=get()文件"usr/lib/python3.8/multiprocessing/queues.py";,355行,在get中自我_rlock:文件//usr/lib/python3.8/multiprocessing/synchronize.py";,第95行,in输入返回self_semlock输入()键盘中断中断^C
我终于修复了这个错误。BrokenPipeError: [Error 32] broken pipe
来自Linux操作系统,它将在您执行IO任务时出现。具体来说,当Linux上的读写管道关闭时,由于通信的另一端仍在尝试读写数据,就会出现这种错误。
现在有趣的部分在这里:我使用了6个workers
,这是我的映射函数中CPU cores
的数字。dataset
迭代中的数据长度为25。因此,执行映射函数的管道有5行,每行有4个文件,1行有5个文件。我想这是错误的原因,而最后一行有5个文件造成了一些干扰。因此,我将数据集中的文件数量从25减少到24,并将工作者数量减少到6并删除了batch_size=5
。这使得数据的长度可以被进程的数量整除,并使错误消失。以下是有关BrokenPipeline Error
的更多信息的链接。