多处理事件队列未更新



所以我正在编写一个带有事件系统的程序。我有一份要处理的事件清单。一个进程应该将新事件推送到处理程序列表中。当我在推送一个事件后试图打印出要处理的列表时,这部分似乎起了作用。它变得越来越长,而当我在handle事件方法中打印出to handle列表时,它一直是空的。

这是我的事件处理程序代码:

class Event_Handler:
def __init__(self):
self._to_handle_list = [deque() for _ in range(Event_Prio.get_num_prios()) ]            
self._controll_handler= None
self._process_lock = Lock() 

def init(self, controll_EV_handler):
self._controll_handler= controll_EV_handler

def new_event(self, event):          #adds a new event to list
with self._process_lock:
self._to_handle_list[event.get_Prio()].append(event) #this List grows 


def handle_event(self):         #deals with the to_handle_list
self._process_lock.acquire()

for i in range(Event_Prio.get_num_prios()):  #here i keep a list of empty deque
print(self._to_handle_list)
if (self._to_handle_list[i]): #checks if to-do is empty, never gets here that its not
self._process_lock.release()
self._controll_handler.controll_event(self._to_handle_list[i].popleft())
return
self._process_lock.release()

def create_Event(self, prio, type):
return Event(prio, type)

我什么都试过了。我检查了两个进程的事件处理程序id是否相同(加上锁有效(我甚至检查了两种方法的to handle列表id是否相同;是的。一个过程中的一个仍然在增长,而另一个是空的。有人能告诉我为什么这个单子是空的吗?

编辑:如果我只通过一个进程在系统中抛出一个事件,它就可以正常工作。与多处理有关

编辑:因为有人问,这里有一个简单的用例(我只使用了要点(:

class EV_Main():
def __init__(self):
self.e_h = Event_Handler()
self.e_controll = None  #the controller doesnt even matter because the controll-function never gets called....list is always empty 

def run(self):

self.e_h.init(self.e_controll)
process1 = Process(target = self.create_events)
process2 = Process(target = self.handle_events)
process1.start()
process2.start()
def create_events(self):
while True:
self.e_h.new_event(self.e_h.create_Event(0, 3))    # eEvent_Type.S_TOUCH_EVENT
time.sleep(0.3)
def handle_events(self):
while True:
self.e_h.handle_event()
time.sleep(0.1)

要拥有一组可共享的deque实例,您可以创建一个特殊的类DequeArray,它将包含deque实例的内部列表,并公开您可能需要的任何方法。然后我会把它变成一个可共享的、管理的对象。当管理器创建此类的实例时,返回的是驻留在管理器地址空间中的实际实例的代理。您在此代理上进行的任何方法调用实际上都会使用pickle发送到管理器的进程,并且以相同的方式返回任何结果。由于单个deque实例是不可共享的托管对象,请不要添加返回其中一个deque实例的方法,然后在不知道管理器地址空间中的deque版本已被而未被修改的情况下对其进行修改。

deque上的各个操作被序列化。但是,如果您正在对由deque上的多个方法调用组成的deque执行某些操作,并且您需要原子性,则该序列是一个关键部分,需要在锁的控制下完成,如下面的left_rotate函数中所示。

from multiprocessing import Process, Lock
from multiprocessing.managers import BaseManager
from collections import deque
# Add methods to this as required:
class DequeArray:
def __init__(self, array_size):
self._deques = [deque() for _ in range(array_size)]
def __repr__(self):
l = []
l.append('DequeArray [')
for d in self._deques:
l.append('    ' + str(d))
l.append(']')
return 'n'.join(l)
def __len__(self):
"""
Return our length (i.e. the number of deque
instances we have).
"""
return len(self._deques)
def append(self, i, value):
"""
Append value to the ith deque
"""
self._deques[i].append(value)
def popleft(self, i):
"""
Eexcute a popleft operation on the ith deque
and return the result.
"""
return self._deques[i].popleft()
def length(self, i):
"""
Return length of the ith dequeue.
"""
return len(self._deques[i])

class DequeArrayManager(BaseManager):
pass
DequeArrayManager.register('DequeArray', DequeArray)
# Demonstrate how to use a sharable DequeArray
def left_rotate(deque_array, lock, i):
# Rotate first element to be last element:
# This is not an atomic operation, so do under control of a lock:
with lock:
deque_array.append(i, deque_array.popleft(i))

# Required for Windows:
if __name__ == '__main__':
# This starts the manager process:
with DequeArrayManager() as manager:
# Two deques:
deque_array = manager.DequeArray(2)
# Initialize with some values:
deque_array.append(0, 0)
deque_array.append(0, 1)
deque_array.append(0, 2)
# Same values in second deque:
deque_array.append(1, 0)
deque_array.append(1, 1)
deque_array.append(1, 2)
print(deque_array)
# Both processses will be modifying the same deque in a
# non-atomic way, so we definitely need to be doing this under
# control of a lock. We don't care which process acquires the
# lock first because the results will be the same regardless.
lock = Lock()
p1 = Process(target=left_rotate, args=(deque_array, lock, 0))
p2 = Process(target=left_rotate, args=(deque_array, lock, 0))
p1.start()
p2.start()
p1.join()
p2.join()
print(deque_array)

打印:

DequeArray [
deque([0, 1, 2])
deque([0, 1, 2])
]
DequeArray [
deque([2, 0, 1])
deque([0, 1, 2])
]

最新更新