Python:在多进程生成的两个线程之间共享变量.过程



Python 3.1.2

我在多处理生成的两个线程之间共享变量时遇到问题。过程。它是简单的布尔变量,应该确定线程是否应该运行或应该停止执行。下面是三种情况下的简化代码(但使用与我的原始代码相同的机制):

  1. 线程的主要类。螺纹类型和self.is_running布尔类型[工作正常]。
  2. 多进程的主类。进程类型和布尔型self.is_running [不工作。子线程具有self.is_running的本地副本,而不是共享它]。
  3. 多进程的主类。进程类型和self.is_running为多处理类型。值("b",真)[工作正常]。

我想要的是了解为什么它以这种方式而不是另一种方式工作。(即为什么第 2 点没有像我假设的那样工作)。

测试是通过python的解释器完成的:

from testclass import *
d = TestClass()
d.start()
d.stop()

下面是第 1 点的示例:

import threading
import time
import queue
import multiprocessing
class TestClass(threading.Thread):
def __init__(self):
    threading.Thread.__init__(self)
    self.q = queue.Queue(10)
    self.is_running = True
    self.sema = threading.Semaphore()
def isRunning(self):
    self.sema.acquire()
    print ("Am I running?", self.is_running)
    z = self.is_running
    self.sema.release()
    return z
def stop(self):
    self.sema.acquire()
    self.is_running = False
    print("STOPPING")
    self.sema.release()
def reader(self):
    while self.isRunning():
        print("R] Reading!")
        try:
            data = self.q.get(timeout=1)
        except:
            print("R] NO DATA!")
        else:
            print("R] Read: ", data)
def writer(self):
    while self.isRunning():
        print("W] Writing!")
        self.q.put(time.time())
        time.sleep(2)
def run(self):
    tr = threading.Thread(target=self.reader)
    tw = threading.Thread(target=self.writer)
    tr.start()
    tw.start()
    tr.join()
    tw.join()

第 2 点的示例:

import threading
import time
import queue
import multiprocessing

class Test(multiprocessing.Process):
def __init__(self):
    multiprocessing.Process.__init__(self)
    self.q = queue.Queue(10)
    self.is_running = True
    self.sema = threading.Semaphore()
def isRunning(self):
    self.sema.acquire()
    print ("Am I running?", self.is_running)
    z = self.is_running
    self.sema.release()
    return z
def stop(self):
    self.sema.acquire()
    self.is_running = False
    print("STOPPING")
    self.sema.release()
def reader(self):
    while self.isRunning():
        print("R] Reading!")
        try:
            data = self.q.get(timeout=1)
        except:
            print("R] NO DATA!")
        else:
            print("R] Read: ", data)
def writer(self):
    while self.isRunning():
        print("W] Writing!")
        self.q.put(time.time())
        time.sleep(2)
def run(self):
    tr = threading.Thread(target=self.reader)
    tw = threading.Thread(target=self.writer)
    tr.start()
    tw.start()
    tr.join()
    tw.join()

第 3 点的示例:

import threading
import time
import queue
import multiprocessing
class TestClass(multiprocessing.Process):
def __init__(self):
    multiprocessing.Process.__init__(self)
    self.q = queue.Queue(10)
    self.is_running = multiprocessing.Value("b", True)
    self.sema = threading.Semaphore()
def isRunning(self):
    self.sema.acquire()
    print ("Am I running?", self.is_running)
    z = self.is_running.value
    self.sema.release()
    return z
def stop(self):
    self.sema.acquire()
    self.is_running.value = False
    print("STOPPING")
    self.sema.release()
def reader(self):
    while self.isRunning():
        print("R] Reading!")
        try:
            data = self.q.get(timeout=1)
        except:
            print("R] NO DATA!")
        else:
            print("R] Read: ", data)
def writer(self):
    while self.isRunning():
        print("W] Writing!")
        self.q.put(time.time())
        time.sleep(2)
def run(self):
    tr = threading.Thread(target=self.reader)
    tw = threading.Thread(target=self.writer)
    tr.start()
    tw.start()
    tr.join()
    tw.join()

线程都是同一进程的一部分,因此它们共享内存。另一个后果是线程不能完全同时由不同的CPU执行,因为一个进程只能由一个CPU拾取。

进程具有单独的内存空间。一个 CPU 可以运行一个进程,同时另一个 CPU 运行另一个进程。需要特殊的结构来让流程协作。

在第 2 点中,父进程和子进程都有自己的 is_running 副本。在父进程中调用stop()时,它只会修改父进程中的is_running,而不修改子进程中的。多处理的原因。价值工作是它的内存在两个进程之间共享。

如果需要进程感知队列,请使用多处理。队列。

最新更新