如何修复"类型错误:无法将队列传递给子进程中的线程时腌制_thread.lock 对象"



我整天都被困在这个问题上,我找不到与我要完成的工作有关的任何解决方案。

我试图将队列传递给子过程中产生的线程。队列是在入口文件中创建的,并将每个子过程作为参数传递。

我正在制作模块化程序以a(运行神经网络b(在需要时自动更新网络模型c(日志事件/图像从神经网络到服务器。我以前的程序偶像偶像运行多个线程,并且速度越来越慢,所以我决定我需要在程序的某些部分中进行副处理,以便它们可以在自己的内存空间中运行到最大的潜力。

子过程:

  1. 客户服务器通信
  2. 网络摄像头控制和图像处理
  3. 对神经网络的推断(有2个具有自己的过程的神经网络(

4个子过程。

随着我的开发,我需要在每个过程中进行交流,以便它们都在同一页面上,其中包括服务器的事件以及其他内容。因此,据我所知,队列将是最好的选择。

(澄清:从"多处理"模块中的"队列",而不是"队列"模块(

~~但是~~

这些子过程中的每个子处理都产生了自己的线程。例如,第一个子过程将产生多个线程:每个队列的一个线程以聆听来自不同服务器的事件并将其交给程序的不同区域;一个线程听取从一个神经网络中接收图像的队列;一个线程以收听网络摄像头接收实时图像的队列;一个线程以收听从另一个神经网络接收输出的队列。

我可以无问题地将队列传递给子过程,并可以有效地使用它们。但是,当我尝试将它们传递到每个子过程中的线程时,我会收到上述错误。

我是多处理的新手;但是,它背后的方法看起来与线程相对较差,除了共享内存空间和GIL。

这是来自main.py;程序入口。

from lib.client import Client, Image
from multiprocessing import Queue, Process
class Main():
    def __init__(self, server):
        self.KILLQ = Queue()
        self.CAMERAQ = Queue()
        self.CLIENT = Client((server, 2005), self.KILLQ, self.CAMERAQ)
        self.CLIENT_PROCESS = Process(target=self.CLIENT.do, daemon=True)
        self.CLIENT_PROCESS.start()
if __name__ == '__main__':
    m = Main('127.0.0.1')
    while True:
        m.KILLQ.put("Hello world")

这是来自client.py(在一个名为lib的文件夹中(

class Client():
    def __init__(self, connection, killq, cameraq):
        self.TCP_IP = connection[0]
        self.TCP_PORT = connection[1]
        self.CAMERAQ = cameraq
        self.KILLQ = killq
        self.BUFFERSIZE = 1024
        self.HOSTNAME = socket.gethostname()
        self.ATTEMPTS = 0
        self.SHUTDOWN = False
        self.START_CONNECTION = MakeConnection((self.TCP_IP, self.TCP_PORT))
        # self.KILLQ_THREAD = Thread(target=self._listen, args=(self.KILLQ,), daemon=True)
        # self.KILLQ_THREAD.start()
    def do(self):
        # The function ran as the subprocess from Main.py
        print(self.KILLQ.get())
    def _listen(self, q):
        # This is threaded multiple times listening to each Queue (as 'q' that is passed when the thread is created)
        while True:
            print(self.q.get())

# self.KILLQ_THREAD = Thread(target=self._listen, args=(self.KILLQ,), daemon=True)

这是丢弃错误的地方。如果我发表评论,则该程序运行良好。我可以在此子过程中从队列中阅读没有问题(即函数'do'(,而不是在此子过程下的线程中(即函数'_listen'(。

我需要能够在每个过程中进行通信,以便它们可以与主程序保持一致(即,在神经网络模型更新的情况下,推理子过程需要关闭,以便可以更新模型而不会引起错误(。

对此的任何帮助都很好!

我也对其他可以使用的交流方法非常开放。如果您认为更好的沟通过程将起作用;它需要足够快以支持从相机发送到服务器的4K图像的实时流。

非常感谢您的宝贵时间!:(

队列不是问题。multiprocessing软件包中的产品设计为挑选,以便可以在过程之间共享。

问题是,您的线程KILLQ_THREAD是在主过程中创建的。线程在过程之间不共享线程。实际上,当一个过程按照POSIX标准分叉时,在父进程中活跃的线程是不是的一部分,该过程被克隆到新孩子的内存空间。原因之一是呼叫fork()时的静音状态可能会导致儿童过程中的死锁。

您必须将线程的创建转移到孩子过程中,即

def do(self):
    self.KILLQ_THREAD = Thread(target=self._listen, args=(self.KILLQ,), daemon=True)
    self.KILLQ_THREAD.start()

大概,KILLQ应该向儿童过程关闭发出信号。在这种情况下,尤其是如果您打算使用多个儿童过程,队列不是实现这一目标的最佳方法。由于Queue.get()Queue.get_nowait() 删除队列中的项目,因此只能由一个消费者检索和处理每个项目。您的生产商将不得不将多个关闭信号放入队列中。在多消费者方案中,您也没有合理的方法来确保特定的消费者收到任何特定物品。任何消费者都可以从中读取队列中的任何项目。

用于信令,尤其是在多个收件人的信号中,更好地使用Event

您还会注意到,启动程序后您的程序似乎很快就会悬挂。那是因为您同时开始使用daemon=True的孩子和线程。

当您的Client.do()方法看起来像上面时,即创建并启动线程,然后退出,您的子过程在呼叫self.KILLQ_THREAD.start()之后立即结束,而守护程序则立即以其结束。您的主要过程没有注意到任何内容,并且继续将 Hello World 纳入队列,直到最终填充并加剧queue.Full

这是一个使用Event的凝结的代码示例,用于在两个子过程中关闭信令,每个线程一个线程。

main.py

import time    
from lib.client import Client
from multiprocessing import Process, Event
class Main:
    def __init__(self):
        self.KILLQ = Event()
        self._clients = (Client(self.KILLQ), Client(self.KILLQ))
        self._procs = [Process(target=cl.do, daemon=True) for cl in self._clients]
        [proc.start() for proc in self._procs]
if __name__ == '__main__':
    m = Main()
    # do sth. else
    time.sleep(1)
    # signal for shutdown
    m.KILLQ.set()
    # grace period for both shutdown prints to show
    time.sleep(.1)

client.py

import multiprocessing
from threading import Thread
class Client:
    def __init__(self, killq):
        self.KILLQ = killq
    def do(self):
        # non-daemonic thread! We want the process to stick around until the thread 
        # terminates on the signal set by the main process
        self.KILLQ_THREAD = Thread(target=self._listen, args=(self.KILLQ,))
        self.KILLQ_THREAD.start()
    @staticmethod
    def _listen(q):
        while not q.is_set():
            print("in thread {}".format(multiprocessing.current_process().name))
        print("{} - master signalled shutdown".format(multiprocessing.current_process().name))

输出

[...]
in thread Process-2
in thread Process-1
in thread Process-2
Process-2 - master signalled shutdown
in thread Process-1
Process-1 - master signalled shutdown
Process finished with exit code 0

至于相互处理通信的方法,您可能需要查看流服务器解决方案。米格尔·格林伯格(Miguel Grinberg(于2014年8月从2014年8月进行了最新的后续播放,撰写了一本关于视频流的出色教程。

相关内容

  • 没有找到相关文章

最新更新