在 Python 中使用多处理读取文件时的同步



我有一个python函数,可以从一个大文件中读取随机片段并对其进行一些处理。我希望处理在多个进程中进行,因此要利用多处理。我在父进程中打开文件(以二进制模式(,并将文件描述符传递给每个子进程,然后使用多处理。Lock(( 以同步对文件的访问。对于单个工作线程,事情按预期工作,但对于更多的工作线程,即使使用锁,文件读取也会随机返回错误数据(通常来自文件的一部分,来自文件另一部分的一点(。此外,文件中的位置(由 file.tell(( 返回(经常会搞砸。这一切都表明访问描述符的基本竞争条件,但我的理解是多处理。Lock(( 应该阻止并发访问它。file.seek(( 和/或 file.read(( 是否有某种不包含在锁定/解锁屏障中的异步操作?这是怎么回事?

一个简单的解决方法是让每个进程单独打开文件并获得自己的文件描述符(我已经确认这确实有效(,但我想了解我缺少什么。以文本模式打开文件还可以防止问题发生,但不适用于我的用例,并且无法解释二进制情况下发生了什么。

我已经在许多 Linux 系统和 OS X 以及各种本地和远程文件系统上运行了以下复制器。我总是得到很多错误的文件位置和至少几个校验和错误。我知道读取不能保证读取请求的全部数据量,但我已经确认这不是这里发生的事情,并省略了该代码以保持简洁。

import argparse
import multiprocessing
import random
import string
def worker(worker, args):
rng = random.Random(1234 + worker)
for i in range(args.count):
block = rng.randrange(args.blockcount)
start = block * args.blocksize
with args.lock:
args.fd.seek(start)
data = args.fd.read(args.blocksize)
pos = args.fd.tell()
if pos != start + args.blocksize:
print(i, "bad file position", start, start + args.blocksize, pos)
cksm = sum(data)
if cksm != args.cksms[block]:
print(i, "bad checksum", cksm, args.cksms[block])
args = argparse.Namespace()
args.file = '/tmp/text'
args.count = 1000
args.blocksize = 1000
args.blockcount = args.count
args.filesize = args.blocksize * args.blockcount
args.num_workers = 4
args.cksms = multiprocessing.Array('i', [0] * args.blockcount)
with open(args.file, 'w') as f:
for i in range(args.blockcount):
data = ''.join(random.choice(string.ascii_lowercase) for x in range(args.blocksize))
args.cksms[i] = sum(data.encode())
f.write(data)
args.fd = open(args.file, 'rb')  
args.lock = multiprocessing.Lock()
procs = []
for i in range(args.num_workers):
p = multiprocessing.Process(target=worker, args=(i, args))
procs.append(p)
p.start()

示例输出:

$ python test.py
158 bad file position 969000 970000 741000
223 bad file position 908000 909000 13000
232 bad file position 679000 680000 960000
263 bad file position 959000 960000 205000
390 bad file position 771000 772000 36000
410 bad file position 148000 149000 42000
441 bad file position 677000 678000 21000
459 bad file position 143000 144000 636000
505 bad file position 579000 580000 731000
505 bad checksum 109372 109889
532 bad file position 962000 963000 243000
494 bad file position 418000 419000 2000
569 bad file position 266000 267000 991000
752 bad file position 732000 733000 264000
840 bad file position 801000 802000 933000
799 bad file position 332000 333000 989000
866 bad file position 150000 151000 248000
866 bad checksum 109116 109375
887 bad file position 39000 40000 974000
937 bad file position 18000 19000 938000
969 bad file position 20000 21000 24000
953 bad file position 542000 543000 767000
977 bad file position 694000 695000 782000

这似乎是由缓冲引起的:使用open(args.file, 'rb', buffering=0)我无法再重现了。

https://docs.python.org/3/library/functions.html#open

缓冲

是用于设置缓冲策略的可选整数。通过 0 关闭缓冲 [...]当未给出缓冲参数时,默认缓冲策略的工作方式如下:[...]二进制文件以固定大小的块缓冲;缓冲区的大小 [...] 通常为 4096 或 8192 字节长。[...]

我已经检查过,只使用多处理。锁定(无缓冲= 0(,仍然满足bad datamultiprocessing.Lockbuffering=0,一切都很顺利

相关内容

  • 没有找到相关文章

最新更新