我有一个multiprocessing.manager.Array
对象,将由多个工作线程共享以记录观察到的事件:数组中的每个元素都保存不同事件类型的计数。递增计数需要读取和写入操作,因此我认为为了避免竞争条件,每个工作线程都需要请求涵盖两个阶段的锁,例如
with lock:
my_array[event_type_index] += 1
我的直觉是,应该可以在特定的数组元素上放置一个锁。使用这种类型的锁,工作线程 #1 可以在工作线程 #2 增加元素 2 的同时递增元素 1。这对我的应用程序(n-gram 计数(特别有用,其中数组长度非常大并且很少发生冲突。
但是,我不知道如何为数组请求元素锁。multiprocessing
中是否存在这样的事情,或者有解决方法吗?
为了获得更多上下文,我在下面包含了我当前的实现:
import multiprocessing as mp
from queue import Empty
def count_ngrams_in_sentence(n, ngram_counts, char_to_idx_dict, sentence_queue, lock):
while True:
try:
my_sentence_str = sentence_queue.get_nowait()
my_sentence_indices = [char_to_idx_dict[i] for i in my_sentence_str]
my_n = n.value
for i in range(len(my_sentence_indices) - my_n + 1):
my_index = int(sum([my_sentence_indices[i+j]*(27**(my_n - j - 1))
for j in range(my_n)]))
with lock: # lock the whole array?
ngram_counts[my_index] += 1
sentence_queue.task_done()
except Empty:
break
return
if __name__ == '__main__':
n = 4
num_ngrams = 27**n
num_workers = 2
sentences = [ ... list of sentences in lowercase ASCII + spaces ... ]
manager = mp.Manager()
sentence_queue = manager.JoinableQueue()
for sentence in sentences:
sentence_queue.put(sentence)
n = manager.Value('i', value=n, lock=False)
char_to_idx_dict = manager.dict([(i,ord(i)-97) for i in string.ascii_lowercase] + [(' ', 26)],
lock=False)
lock = manager.Lock()
ngram_counts = manager.Array('l', [0]*num_ngrams, lock=lock)
''
workers = [mp.Process(target=count_ngrams_in_sentence,
args=[n,
ngram_counts,
char_to_idx_dict,
sentence_queue,
lock]) for i in range(num_workers)]
for worker in workers:
worker.start()
sentence_queue.join()
Multiprocessing.manager.Array 带有一个内置锁。必须切换到RawArray。
有一个锁列表。在修改代码之前,请获取数组的锁。然后释放。
locks[i].acquire()
array[i,:]=0
locks[i].release()
正如我所说,如果数组是MultiProcessing.RawArray或类似的,则多个进程可以同时读取或写入。对于某些类型的数组,读取/写入数组本质上是原子的 - 锁本质上是内置的。在继续之前,请仔细研究这一点。
至于性能,在 Python 中索引到列表是纳秒的数量级,获取和释放锁是微秒量级的。这不是一个大问题。