如何允许多个线程同时修改类的变量



我有一个类(MyClass(,它包含一个需要运行的操作队列(self.msg_queue(,并且我有多个输入源,可以将任务添加到队列中。

现在我有三个想要同时运行的函数:

  • MyClass.get_input_from_user((
    • 在tkinter中创建一个窗口,让用户填写信息,当用户按下submit时,它会将该消息推送到队列中
  • MyClass.get_input_from_server((
    • 检查服务器是否有消息,读取消息,然后将其放入队列。此方法使用MyClass父类中的函数
  • MyClass.execute_next_item_on_the_queue((
    • 从队列中弹出一条消息,然后对其执行操作。这取决于消息是什么,但每条消息都对应于MyClass中的某个方法或其父方法,该方法根据一个大决策树运行

流程描述:在类加入网络后,我让它生成三个线程(上面的每个函数一个(。每个线程函数使用语法"self.msg_queue.put(message("从队列中添加项目,并使用"self.message_queue.get_nowait(("从该队列中删除项目。

问题描述:我遇到的问题是,似乎每个线程都在修改自己的队列对象(它们没有共享类的队列msg_queue,它们的函数都是该类的成员(。

我对多处理器不够熟悉,不知道重要的错误消息是什么;然而,它声明它不能pickle一个weakref对象(它没有指示哪个对象是weakref目标(,并且在queue.put((中调用行"self._sem.acquire(block,timeout("会产生"[WinError 5]Access is denied"错误。假设队列引用中的此故障没有正确复制,是否安全?

[我使用的是Python 3.7.2和多处理器包的进程和队列]

[我看到过多个关于让线程在类之间穿梭信息的问答——创建一个生成队列的主工具,然后将该队列作为参数传递给每个线程。如果这些函数不必使用MyClass中的其他函数,我可以通过让这些函数进入队列并使用局部变量而不是类变量来调整这种策略。]

[我非常确信,这个错误不是将我的队列传递给tkinter对象的结果,因为我的单元测试了我的GUI如何修改其调用方的队列。]

以下是队列错误的最小可复制示例:

from multiprocessing import Queue
from multiprocessing import Process
import queue
import time
class MyTest:
def __init__(self):
self.my_q = Queue()
self.counter = 0
def input_function_A(self):
while True:
self.my_q.put(self.counter)
self.counter = self.counter + 1
time.sleep(0.2)
def input_function_B(self):
while True:
self.counter = 0
self.my_q.put(self.counter)
time.sleep(1)
def output_function(self):
while True:
try:
var = self.my_q.get_nowait()
except queue.Empty:
var = -1
except:
break
print(var)
time.sleep(1)
def run(self):
process_A = Process(target=self.input_function_A)
process_B = Process(target=self.input_function_B)
process_C = Process(target=self.output_function)
process_A.start()
process_B.start()
process_C.start()
# without this it generates the WinError: 
# with this it still behaves as if the two input functions do not modify the queue
process_C.join() 
if __name__ == '__main__':
test = MyTest()
test.run()

事实上,这些不是"线程"-这些是"进程"-而如果您使用的是多线程,而不是多处理,则self.my_q实例将是相同的对象,放置在计算机的相同内存空间中,multiprocessing对流程执行fork,原始流程中的任何数据(在"run"调用中执行的数据(在使用时都会重复,因此,每个子流程都会看到自己的"Queue"实例,与其他子流程无关。

正确的方法是让各种进程共享一个多进程。Queue对象将其作为参数传递给目标方法。重新组织代码使其工作的更简单方法是:

from multiprocessing import Queue
from multiprocessing import Process
import queue
import time
class MyTest:
def __init__(self):
self.my_q = Queue()
self.counter = 0
def input_function_A(self, queue):
while True:
queue.put(self.counter)
self.counter = self.counter + 1
time.sleep(0.2)
def input_function_B(self, queue):
while True:
self.counter = 0
queue.put(self.counter)
time.sleep(1)
def output_function(self, queue):
while True:
try:
var = queue.get_nowait()
except queue.Empty:
var = -1
except:
break
print(var)
time.sleep(1)
def run(self):
process_A = Process(target=self.input_function_A, args=(queue,))
process_B = Process(target=self.input_function_B, args=(queue,))
process_C = Process(target=self.output_function, args=(queue,))
process_A.start()
process_B.start()
process_C.start()
# without this it generates the WinError: 
# with this it still behaves as if the two input functions do not modify the queue
process_C.join() 
if __name__ == '__main__':
test = MyTest()
test.run()

正如您所看到的,由于您的类实际上并没有通过实例的属性共享任何数据,因此这种"类"设计对您的应用程序没有多大意义,而是对同一代码块中的不同工作者进行分组。

有一个神奇的多进程类是可能的,它将有一些内部方法来实际启动工作方法并共享Queue实例——因此,如果你在一个项目中有很多这样的方法,那么样板就会少得多。

大致情况:

from multiprocessing import Queue
from multiprocessing import Process
import time

class MPWorkerBase:
def __init__(self, *args, **kw):
self.queue = None
self.is_parent_process = False
self.is_child_process = False
self.processes = []
# ensure this can be used as a colaborative mixin
super().__init__(*args, **kw)
def run(self):
if self.is_parent_process or self.is_child_process:
# workers already initialized
return
self.queue = Queue()
processes = []
cls = self.__class__
for name in dir(cls):
method = getattr(cls, name)
if callable(method) and getattr(method, "_MP_worker", False):
process = Process(target=self._start_worker, args=(self.queue, name))
self.processes.append(process)
process.start()
# Setting these attributes here ensure the child processes have the initial values for them.
self.is_parent_process = True
self.processes = processes
def _start_worker(self, queue, method_name):
# this method is called in a new spawned process - attribute
# changes here no longer reflect attributes on the
# object in the initial process
# overwrite queue in this process with the queue object sent over the wire:
self.queue = queue
self.is_child_process = True
# call the worker method
getattr(self, method_name)()
def __del__(self):
for process in self.processes:
process.join()

def worker(func):
"""decorator to mark a method as a worker that should
run in its own subprocess
"""
func._MP_worker = True
return func

class MyTest(MPWorkerBase):
def __init__(self):
super().__init__()
self.counter = 0
@worker
def input_function_A(self):
while True:
self.queue.put(self.counter)
self.counter = self.counter + 1
time.sleep(0.2)
@worker
def input_function_B(self):
while True:
self.counter = 0
self.queue.put(self.counter)
time.sleep(1)
@worker
def output_function(self):
while True:
try:
var = self.queue.get_nowait()
except queue.Empty:
var = -1
except:
break
print(var)
time.sleep(1)

if __name__ == '__main__':
test = MyTest()
test.run()

相关内容

  • 没有找到相关文章

最新更新