Python concurrency with concurrent.futures.ThreadPoolExecuto



请考虑以下代码片段:

import concurrent.futures
import time
from random import random
class Test(object):
def __init__(self):
self.my_set = set()
def worker(self, name):
temp_set = set()
temp_set.add(name)
temp_set.add(name*10)
time.sleep(random() * 5)
temp_set.add(name*10 + 1)
self.my_set = self.my_set.union(temp_set) # question 1
return name
def start(self):
result = []
names = [1,2,3,4,5,6,7]
with concurrent.futures.ThreadPoolExecutor(max_workers=len(names)) as executor:
futures = [executor.submit(self.worker, x) for x in names]
for future in concurrent.futures.as_completed(futures):
result.append(future.result()) # question 2
  1. 是否有可能通过标记为"问题 1"的行self.my_set损坏?我相信union是原子的,但分配不是问题吗?

  2. 标有"问题 2"的行是否有问题?我相信列表append是原子的,所以也许这没关系。

我读过这些文档:

https://docs.python.org/3/library/stdtypes.html#set https://web.archive.org/web/20201101025814id_/http://effbot.org/zone/thread-synchronization.htm Python 变量赋值是原子的吗? https://docs.python.org/3/glossary.html#term-global-interpreter-lock

并执行了此问题中提供的片段代码,但我找不到在这种情况下并发应该如何工作的明确答案。

关于问题1:想想这里发生了什么:

self.my_set = self.my_set.union(temp_set)

至少有三个不同步骤的序列

  1. worker调用获取self.my_set的副本(对Set对象的引用)
  2. union函数构造一个新集合。
  3. worker分配self.my_set来引用新构建的集合。

那么,如果两个或多个工人同时尝试做同样的事情会发生什么?(注意:不能保证以这种方式发生,但可能会以这种方式发生。

  1. 他们每个人都可以获取对原始my_set的引用。
  2. 他们每个人都可以计算一个新的集合,只包括my_set的原始成员加上它自己的贡献。
  3. 他们每个人都可以将其新集分配给my_set变量。

问题出在第三步。如果以这种方式发生,那么这些新集合中的每一个都只包含创建它的一个工作人员的贡献。不会有一套包含所有工人的新贡献。当一切都结束时,my_set只会引用其中一个新集合——最后一个执行分配的线程将"获胜"——其他新集合都将被丢弃。

防止这种情况的一种方法是使用互斥来防止其他线程尝试计算其新集并同时更新共享变量:

class Test(object):
def __init__(self):
self.my_set = set()
self.my_set_mutex = threading.Lock()
def worker(self, name):
...
with self.my_set_mutex
self.my_set = self.my_set.union(temp_set)
return name

关于问题2:附加到列表是否是"原子的"并不重要。result变量是start方法的局部变量。在你显示的代码中,除了创建列表的线程之外,任何其他线程都无法访问result引用的列表。线程之间不会有任何干扰,除非您与其他线程共享列表。

最新更新