从父进程调用 GMainLoop.quit() 用于在子进程中运行的主循环



我需要运行一个gstreamer管道来执行视频流。GStreamer 管道需要一个具有run()方法的GObject.MainLoop对象,该方法在调用quit()之前不会终止。 为此,我从我的主应用程序进程(P1)创建了一个进程(P2),该进程在其主线程中运行GObject.MainLoop实例。问题是循环在进程 P2 中无限期地进行,我无法从主应用程序进程 (P1) 中退出/退出它。

下面是可能有助于理解该方案的代码部分。

'''
start() spawns a new process P2 that runs Mainloop within its main thread.
stop() is called from P1, but does not quit the Mainloop. This is probably because 
processes do not have shared memory
'''
from multiprocessing import Process
import gi
from gi.repository import GObject
class Main:
def __init__(self):
self.process = None
self.loop = GObject.MainLoop()
def worker(self):
self.loop.run()
def start(self):
self.process=Process(target=self.worker, args=())
self.process.start()
def stop(self):
self.loop.quit()

接下来,我尝试使用多处理队列在进程之间共享"loop"变量,但仍然无法退出主循环。

'''
start() spawns a new process and puts the loop object in a multiprocessing Queue
stop() calls get() from the loop and calls the quit() method, though it still does not quit the mainloop.
'''
from multiprocessing import Process, Queue
import gi
from gi.repository import GObject
class Main:
def __init__(self):
self.p=None
self.loop = GObject.MainLoop()
self.queue = Queue()
def worker(self):
self.queue.put(self.loop)
self.loop.run()

def start(self):
self.p=Process(target=self.worker, args=())
self.p.start()
def stop(self):
# receive loop instance shared by Child Process
loop=self.queue.get()
loop.quit()

如何调用只能在子进程 P2 中访问的 MainLoop 对象的 quit 方法?

好的,首先我们需要使用线程而不是进程。 进程将位于不同的地址空间中。

进程和线程有什么区别?

尝试将主循环对象传递给执行实际工作的单独线程。 这将使您的主要方法只不过是一个基本的 GLib 事件处理循环,但这很好,并且是许多 GLib 应用程序中的正常行为。

最后,我们需要处理子进程在主循环激活之前完成其工作的竞争条件。 我们使用while not loop.is_running()代码段执行此操作。

from threading import Thread
import gi
from gi.repository import GObject
def worker(loop):
while not loop.is_running():
print("waiting for loop to run")
print("working")
loop.quit()
print("quitting")
class Main:
def __init__(self):
self.thread = None
self.loop = GObject.MainLoop()
def start(self):
self.thread=Thread(target=worker, args=(self.loop,))
self.thread.start()
self.loop.run()
def main():
GObject.threads_init()
m = Main()
m.start()
if  __name__ =='__main__' : main()

我在类Main中扩展了multiprocessing.Process模块,并覆盖了它的run()方法,以便在另一个线程(T1)而不是其main thread中实际运行GObject.Mainloop实例。然后实现了一个等待通知机制,该机制将使进程(P2)的main thread进入循环wait-notify并使用multiprocessing.Queue将消息转发到P2的主线程,P2将同时得到通知。例如,stop()方法,它将quit消息发送到在重写的run()方法中为其定义了处理程序P2。 可以扩展此模块以将任意数量的消息解析为Child Process,前提是还要定义其处理程序。

以下是我使用的代码片段。

from multiprocessing import Process, Condition, Queue
from threading import Thread
import gi
from gi.repository import GObject
loop=GObject.MainLoop()
def worker():
loop.run()
class Main(Process):
def __init__(self, target=None, args=()):
self.target=target
self.args=tuple(args)
print self.args
self.message_queue = Queue()
self.cond = Condition()
self.thread = None
self.loop = GObject.MainLoop()
Process.__init__(self)
def run(self):
if self.target:
self.thread = Thread(target=self.target, args=())
print "running target method"
self.thread.start()
while True:
with self.cond:
self.cond.wait()
msg = self.message_queue.get()
if msg == 'quit':
print loop.is_running()
loop.quit()
print loop.is_running()
break
else:
print 'message received', msg
def send_message(self, msg):
self.message_queue.put(msg)
with self.cond:
self.cond.notify_all()
def stop(self):
self.send_message("quit")
self.join()
def func1(self):
self.send_message("msg 1") # handler is defined in the overridden run method
# few others functions which will send unique messages to the process, and their handlers 
# are defined in the overridden run method above

这种方法对我的场景效果很好,但如果有更好的方法,欢迎提出建议。

最新更新