如何根据对象的属性对多进程队列进行排序



使用常规列表,我可以根据对象属性对列表进行排序:

queue.sort(key=lambda weed: (weed.x_coord), reverse=True)

但是,对于多处理队列,这是不可能的,那么如何使用多处理队列完成相同的排序呢?或者,如果我想在最后对队列进行排序,最好避免使用多进程队列?

要求队列/列表应该是线程安全的和进程安全的,因为队列/列表将由两个并行运行的线程填充。

将对象插入共享队列的两个进程(p1和p2(将继续与从队列中读取的第三个进程(状态机(一起运行(请参阅下面的代码(。I.e状态机进程将而不是等待p1和p2进程结束。

迄今为止的实施情况:

import multiprocessing
class Weed():
x=None
y=None
def __init__(self,x,y):
self.x=x
self.y=y
def p1(q):
"""
Function that inserts weed in the shared queue
"""
# append squares of mylist to queue
q.put(Weed(10.1,7.3))
q.put(Weed(8.3,2.8))
q.put(Weed(5.1,4.2))
q.put(Weed(15.4,5.0))
def p2(q):
"""
Function that inserts weed in the shared queue
"""
# append squares of mylist to queue
q.put(Weed(25.1,1))
q.put(Weed(1.3,1))
q.put(Weed(9.1,1))
q.put(Weed(13.4,1))

def state_machine(q):
"""
Function that sorts the queue (w.r.t x-coord.) and prints it out
"""
print("Queue elements:")
while not q.empty():
q.sort(key=lambda x: (x.x), reverse=True) # Gives error - 
print(q.get().x)
print("Queue is now empty!")
if __name__ == "__main__":
# creating multiprocessing Queue
q = multiprocessing.Queue()
# creating new processes
p1 = multiprocessing.Process(target=p1, args=(q,))
p2 = multiprocessing.Process(target=p2, args=(q,))
p3 = multiprocessing.Process(target=state_machine, args=(q,))
# running process p1 to generate some weeds
p1.start()

# running process p2 to generate some weeds
p2.start()

# running process p3 to sort the weed queue (by x coord.) and print them out
p3.start()

p1.join()
p2.join()
p3.join()

在您的示例中,这3个进程不是并发运行的(您启动它们并在启动下一个进程之前加入它们(,我假设现实世界中的情况确实具有并发性。

不过要小心:在现实世界中,空队列并不意味着其他任务已经完成。您需要另一个同步机制。

我的建议是回到state_machine函数内的常规列表,并在元素到达时将它们从多处理队列转移到列表。然后可以对列表进行排序并按顺序打印元素。您不会有并发问题,因为内部列表只由运行state_machine的线程修改。

def state_machine(q):
"""
Function that sorts the queue (w.r.t x-coord.) and prints it out
"""
print("Queue elements:")
internal_queue = []
# here, we assume that all tasks have finished working.
# if it is not the case, you should add a barrier to wait
while not q.empty():
internal_queue.append(q.get())
internal_queue.sort(key=lambda item: (item.x), reverse=True)
for elt in internal_queue:
print(elt.x)
print("Queue is now empty!")

程序打印:

Queue elements:
25.1
15.4
13.4
10.1
9.1
8.3
5.1
1.3
Queue is now empty!

[EDIT]

在现实世界中,您不希望等到消费者完成后再开始打印。然而,您必须在两个问题之间找到折衷方案:

  • 如果你在开始消费元素之前等待太久,你基本上又回到了等待生产者完成
  • 如果您消耗队列中的元素太快(即,一旦它们到达就立即打印(,那么您的队列在大多数时间都将是空的,排序就没有多大意义了

这里没有(IMHO(最优解决方案,这里有一个命题,内部队列定期更新,同时给生产商完成工作的时间:

def state_machine(q):
"""
Function that sorts the queue (w.r.t x-coord.) and prints it out
"""
internal_queue = []
def update_internal_queue():
while not q.empty():
internal_queue.append(q.get())
internal_queue.sort(key=lambda item: (item.x), reverse=True)

# wait a bit for the beginning of the elements to arrive
time.sleep(5)
update_internal_queue()
print("Queue elements:")
while internal_queue:
time.sleep(1) # we can optionally wait a bit before each print
update_internal_queue() # see if other elements arrived in the meantime
print(internal_queue.pop(0).x)
print("Queue is now empty!")

最新更新