当我尝试通过multiprocessing.Pipe()
发送torch.tensor
时,我意识到张量大小有一个明显的可发送限制,大约为 64MB。如果我只将此限制再增加一个元素,则pipe_connection.send(tensor)
会失败并Process finished with exit code 135 (interrupted by signal 7: SIGEMT)
,请参阅下面的详细代码。由于Pipe
s是在Queue
s,Pool
s等中实现的,因此在处理较大的对象时,此限制会导致整个multiprocessing
模块出现问题。
问题:
- 此问题是否可以在其他系统上重现?
- 我可以以某种方式增加大约 64MB 的限制吗?我想使用 200MB+ 对象。
我在使用multiprocessing.Pool().map_async()
时意识到了这一点,它不会失败,但无一例外地卡在Pool()._outqueue.put()
内部调用上。Pipe
再次建立在multiprocessing.reduction.ForkingPickler
和socket.socketpair()
的基础上,但我也找不到任何限制。确切的限制确实略微取决于在后台运行的其他 python 内核。我仍然有 64GB 内存,我们谈论的是 64MB 的对象......
import multiprocessing as mp
import torch
def generate_ones_tensor(num_dims):
print('ONES: generating torch.ones() of shape = ({},)'.format(num_dims))
ones = torch.ones(size=(num_dims,), dtype=torch.int64)
print('ONES: torch.ones(size=({},)) generated of approx, size = {}MB'.format(num_dims,
float((8 * num_dims / (1024 ** 2))).__round__(6)))
return ones
if __name__ == '__main__':
# Maximum shape of torch tensor that WILL be send through a pipe
num_max_dims = 838784
max_ones = generate_ones_tensor(num_dims=num_max_dims)
# Minimum shape of torch tensor that WON'T be send through a pipe
num_too_many_dims = 8387585
over_max_ones = generate_ones_tensor(num_dims=num_too_many_dims)
# Create pipe with connections
p1, p2 = mp.Pipe()
# Sending max. byte size
p1.send(max_ones)
print('max_ones was send.')
getted_max_ones = p2.recv()
print('max_ones was received.')
# Sending too many bytes
p1.send(over_max_ones)
print('over_max_ones was send.')
getted_over_max_ones = p2.recv()
print('over_max_ones was received.')
生成输出:
Connected to pydev debugger (build 191.7479.30)
ONES: generating torch.ones() of shape = (838784,)
ONES: torch.ones(size=(838784,)) generated of approx, size = 6.399414MB
ONES: generating torch.ones() of shape = (8387585,)
ONES: torch.ones(size=(8387585,)) generated of approx, size = 63.992195MB
max_ones was send.
max_ones was received.
Process finished with exit code 135 (interrupted by signal 7: SIGEMT)
编辑附件:
我试图通过在计算过程中将张量分块到 21MB 的大小来规避这个问题Pool().starmap_async()
。分块张量由AsyncResult
实例存储在临时文件中。但是在 3 个张量上,它会抛出一个RuntimeError
:
[DEBUG/ForkPoolWorker-1] starting listener and thread for sending handles
[INFO/ForkPoolWorker-1] created temp directory /tmp/pymp-p9qf5b6r
[DEBUG/ForkPoolWorker-4] CONVERTER: converting datasets['train']['input_ids'] from list to tensor
[DEBUG/ForkPoolWorker-5] CONVERTER: pad dataset['train']['input_ids'] to length = 285
[DEBUG/ForkPoolWorker-2] CONVERTER: generated tensor with tensor.shape = torch.Size([5000, 2, 285]) and size = 21.74MB
[DEBUG/ForkPoolWorker-2] starting listener and thread for sending handles
[INFO/ForkPoolWorker-2] created temp directory /tmp/pymp-ubm3dwnp
[DEBUG/ForkPoolWorker-5] CONVERTER: converting datasets['train']['input_ids'] from list to tensor
[DEBUG/ForkPoolWorker-6] CONVERTER: pad dataset['train']['input_ids'] to length = 285
[DEBUG/ForkPoolWorker-3] CONVERTER: generated tensor with tensor.shape = torch.Size([5000, 2, 285]) and size = 21.74MB
[DEBUG/ForkPoolWorker-6] CONVERTER: converting datasets['train']['input_ids'] from list to tensor
[DEBUG/ForkPoolWorker-7] CONVERTER: pad dataset['train']['input_ids'] to length = 285
[DEBUG/ForkPoolWorker-4] CONVERTER: generated tensor with tensor.shape = torch.Size([5000, 2, 285]) and size = 21.74MB
[DEBUG/ForkPoolWorker-4] Possible encoding error while sending result: Error sending result: '[tensor([[[40478, 547, 1999, ..., 0, 0, 0],
[40478, 547, 1999, ..., 0, 0, 0]],
[[40478, 547, 1999, ..., 0, 0, 0],
[40478, 547, 1999, ..., 0, 0, 0]],
[[40478, 547, 1999, ..., 0, 0, 0],
[40478, 547, 1999, ..., 0, 0, 0]],
...,
[[40478, 547, 3898, ..., 0, 0, 0],
[40478, 547, 3898, ..., 0, 0, 0]],
[[40478, 547, 3898, ..., 0, 0, 0],
[40478, 547, 3898, ..., 0, 0, 0]],
[[40478, 547, 3898, ..., 0, 0, 0],
[40478, 547, 3898, ..., 0, 0, 0]]])]'. Reason: 'RuntimeError('unable to write to file </torch_329_1813456617>',)'
[DEBUG/ForkPoolWorker-7] CONVERTER: converting datasets['train']['input_ids'] from list to tensor
[DEBUG/ForkPoolWorker-5] CONVERTER: generated tensor with tensor.shape = torch.Size([5000, 2, 285]) and size = 21.74MB
[DEBUG/ForkPoolWorker-5] Possible encoding error while sending result: Error sending result: '[tensor([[[40478, 547, 3898, ..., 0, 0, 0],
[40478, 547, 3898, ..., 0, 0, 0]],
[[40478, 547, 3898, ..., 0, 0, 0],
[40478, 547, 3898, ..., 0, 0, 0]],
[[40478, 547, 3898, ..., 0, 0, 0],
[40478, 547, 3898, ..., 0, 0, 0]],
...,
解决方法
多亏了@Fabrizio该问题可以确定为特定于系统,并且在另一个系统上无法重现。对于可能仍然遇到相同问题的任何人,一个简单的解决方法是将张量转换为二进制 python 对象,例如pickle.dumps(tensor)
在通过管道发送或酸洗目标func
返回之前Pool().map_async()
:
import pickle
def target_func(*args, **kwargs):
# calculated your results
result = do_your_stuff(...)
return pickle.dumps(result)
如果您遇到同样的问题,请告诉我...