我有一个json
文件,里面只有一个对象:
incme.json
:
{
"value": 0
}
我正在尝试使用带有ProcessPoolExecutor
的multiprocessing
更新它,并使用multiprocessing.Lock
防止竞争条件:
from concurrent.futures import ProcessPoolExecutor
import multiprocessing as mp
import numpy as np
import json
def inc(tup):
lock, ignoreme = tup
with lock:
with open('incme.json', 'r') as f:
data = json.load(f)
print (data)
data['value'] += 1
with lock:
with open('incme.json', 'w') as f:
json.dump(data, fp=f, indent=4)
ignoreme += 1
if __name__ == '__main__':
m = mp.Manager()
lock = m.Lock()
NUMBER_OF_CPUS = mp.cpu_count()
# use up to +++ of the cpu`s available
USE_UP_TO = 0.5
inc((lock, 1))
with ProcessPoolExecutor(max_workers=np.uint16(NUMBER_OF_CPUS * USE_UP_TO)) as executor:
for i in range(100):
print('inc:')
executor.submit(inc, ((lock, 1)))
当上面的代码运行时,它将使value
44
或低于101
。
以这种方式使用锁时:
def inc(tup):
lock, ignoreme = tup
with lock:
with open('incme.json', 'r') as f:
data = json.load(f)
print (data)
data['value'] += 1
with open('incme.json', 'w') as f:
json.dump(data, fp=f, indent=4)
ignoreme += 1
value
变得101
但现在它无法异步工作。 什么原因导致这种情况? 它与 IO 相关任务有关吗?
您的锁似乎保护得太少。是的,您以原子方式读取,并以原子方式写入,但您不以原子方式执行读取-增量-写入序列。例如,没有什么可以阻止所有 100 个进程读取 0,然后每个进程将 1 加到 0,然后每个进程写出 1 作为新值。
相反,请尝试删除第二个with lock:
语句,并缩进print()
和递增语句,以便以原子方式完成整个读-增量-写序列。
编辑
哎呀!我现在看到你已经尝试过了,并且已经发现它有效。所以我只是对你为什么认为原始方式"应该"工作感到困惑。它显然不应该;-)