Python中的多处理优先级队列并不总是得到正确的优先级项



.get()从我的优先级队列返回的值是~70%的时间100%正确排序,但~30%的时间他们将只有~70%正确排序,大多数元素正确,但少数元素洗牌不正确。大多数情况下,当它们被不正确地返回时,从优先级队列返回的第一个项目是不正确的,它对应于放置在优先级队列中的中的第一个(或第一个)项目。我想这应该是问题的一个提示,但我不知道为什么。

我按照这个答案创建了多处理优先级队列。我试着把优先级队列放在(priority, data)的元组中,我也试着使用@dataclass

…相同的结果。我为上面提到的答案写了一个单元测试。如果运行几次,最终应该会看到一个断言错误。也许回答PriorityQueue实际上不是线程/进程安全?如果是这样,我如何使它线程/进程安全?我"minimal"示例如下。我已经在Python 3.7.13和3.10.7中验证了这个问题。

"""
Test the PriorityQueue class by making a writer process to write data into the
PriorityQueue and a reader process to read data from the PriorityQueue.
Ensure the data read out of the PriorityQueue is sorted by priority
"""
from __future__ import annotations      # For type hinting the values in PriorityQueue
from queue import Queue, PriorityQueue
from typing import Union, Any
import numpy as np
from multiprocessing import Process
from dataclasses import dataclass, field
from multiprocessing.managers import SyncManager
from functools import total_ordering
# Via: https://stackoverflow.com/a/25328987/4368898
class PQManager(SyncManager):
pass
PQManager.register("PriorityQueue", PriorityQueue)
def Manager():
m = PQManager()
m.start()
return m
# Not necessary but recommended from:
#   https://docs.python.org/3/library/queue.html#queue.PriorityQueue
@dataclass(order=True)
class PrioritizedItem:
"""
Prioritized item for priority queue as recommended at:
https://docs.python.org/3/library/queue.html#queue.PriorityQueue
"""
priority: float
data: Any=field(compare=False)
def __init__(self, priority: float, data: int = 0):
self.priority = priority
self.data = data

def writer_function(priority_queue_under_test: PriorityQueue[PrioritizedItem],
num_examples: int = 5) -> None:
"""
Write data to the priority queue with random priorities
The final item placed in the priority queue will have a priority of 999.0
(least priority) with data of 12345
"""
for i in range(num_examples-1):
# priority_number: float = round(np.random.normal(), 3)
a = [-0.954,-1.3,-0.785,-1.324,0.576,1.909,-0.296,0.781,-0.209,-0.492]
priority_number = a[i]
print(f"Placing into the PriorityQueue: {priority_number}")
priority_queue_under_test.put(PrioritizedItem(priority_number, 0))
priority_queue_under_test.put(PrioritizedItem(999.0, 12345))    # Signal end of queue
def reader_function(priority_queue_under_test: PriorityQueue[PrioritizedItem],
queue_of_priorities: Queue[float],
queue_of_data: Queue[int]):
"""
Reads values from the priority_queue_under_test and then places the read
priorities and data into queue_of_priorities and queue_of_data, respectively.
Will read until it sees data == 12345
"""
while True:
if not priority_queue_under_test.empty():
prioritized_item: PrioritizedItem = priority_queue_under_test.get()
p = prioritized_item.priority
d = prioritized_item.data
# print(f"Read priority {p}")
queue_of_priorities.put(p)
queue_of_data.put(d)
if d == 12345:
return
if __name__ == "__main__":
NUM_EXAMPLES: int = 10
# Make Queues
manager = Manager()
# Entries are typically tuples of the form: (priority number, data)
# Lowest priority number gets popped first
priority_queue_under_test = manager.PriorityQueue()
queue_of_priorities = manager.Queue()
queue_of_data = manager.Queue()
# Create processes
writer_process = Process(target=writer_function, args=(priority_queue_under_test,
NUM_EXAMPLES,))
reader_process = Process(target=reader_function, args=(priority_queue_under_test,
queue_of_priorities,
queue_of_data,))
# Start processes
writer_process.start()
reader_process.start()
# Wait for processes to end
writer_process.join()
reader_process.join()
# Read priorities and data from the queues
priorities = []
data = []
while not queue_of_priorities.empty():
priorities.append(queue_of_priorities.get())
data.append(queue_of_data.get())
# print(f"Read priority {priorities[-1]}")
print(f"Priorities of the items in the PriorityQueue as they were popped "
f"off of the queue via .get():nt{priorities}")
print(f"Data contained in the PriorityQueue:nt{data}")
print(f"Number of items: {len(data)}")
# Ensure that the queues have been fully read from and the
#   priorities read off of the priority queue are sorted low to high
assert queue_of_priorities.empty()
assert queue_of_data.empty()
assert len(priorities) == len(data) == NUM_EXAMPLES
assert sorted(priorities) == priorities
print("All checks have passed successfully!")

良好,预期产量(约70%的时间):

# python3 test_priority_queue.py 
Placing into the PriorityQueue: -0.954
Placing into the PriorityQueue: -1.3
Placing into the PriorityQueue: -0.785
Placing into the PriorityQueue: -1.324
Placing into the PriorityQueue: 0.576
Placing into the PriorityQueue: 1.909
Placing into the PriorityQueue: -0.296
Placing into the PriorityQueue: 0.781
Placing into the PriorityQueue: -0.209
Priorities of the items in the PriorityQueue as they were popped off of the queue via .get():
[-1.324, -1.3, -0.954, -0.785, -0.296, -0.209, 0.576, 0.781, 1.909, 999.0]
Data contained in the PriorityQueue:
[0, 0, 0, 0, 0, 0, 0, 0, 0, 12345]
Number of items: 10
All checks have passed successfully!

坏,未完全排序输出(~30%的时间):

# python3 test_priority_queue.py 
Placing into the PriorityQueue: -0.954
Placing into the PriorityQueue: -1.3
Placing into the PriorityQueue: -0.785
Placing into the PriorityQueue: -1.324
Placing into the PriorityQueue: 0.576
Placing into the PriorityQueue: 1.909
Placing into the PriorityQueue: -0.296
Placing into the PriorityQueue: 0.781
Placing into the PriorityQueue: -0.209
Priorities of the items in the PriorityQueue as they were popped off of the queue via .get():
[-0.954, -1.324, -1.3, -0.785, -0.296, -0.209, 0.576, 0.781, 1.909, 999.0]
Data contained in the PriorityQueue:
[0, 0, 0, 0, 0, 0, 0, 0, 0, 12345]
Number of items: 10
Traceback (most recent call last):
File "test_priority_queue.py", line 129, in <module>
assert sorted(priorities) == priorities
AssertionError

附加的、坏的、示例。请注意,它仍然是按错误顺序放入优先级队列的第一个元素之一。

# python3 test_priority_queue.py 
Starting test_priority_queue.py
Placing into the PriorityQueue: -0.954
Placing into the PriorityQueue: -1.3
Placing into the PriorityQueue: -0.785
Placing into the PriorityQueue: -1.324
Placing into the PriorityQueue: 0.576
Placing into the PriorityQueue: 1.909
Placing into the PriorityQueue: -0.296
Placing into the PriorityQueue: 0.781
Placing into the PriorityQueue: -0.209
Priorities of the items in the PriorityQueue as they were popped off of the queue via .get():
[-1.3, -1.324, -0.954, -0.785, -0.296, -0.209, 0.576, 0.781, 1.909, 999.0]
Data contained in the PriorityQueue:
[0, 0, 0, 0, 0, 0, 0, 0, 0, 12345]
Number of items: 10
Traceback (most recent call last):
File "test_priority_queue.py", line 129, in <module>
assert sorted(priorities) == priorities
AssertionError

似乎总是有一个元素的顺序不符合预期。我猜这取决于进程的(不可预测的)调度。由于readerwriter同时启动,我认为当reader选择第一个项目进行处理时,并非所有项目都在队列中。根据需要,它选择具有最高优先级的可用选项。在下一次读取时,其他项目已经排队(可能具有更高的优先级),并且从那里开始按顺序处理它们。

当你将一个高优先级的项目注入到一个已经在处理的队列中时,这几乎就是你所期望/想要的。

相关内容

  • 没有找到相关文章

最新更新