Joinable PriorityQueue in python's asyncio



根据文档,有几种队列实现。我感兴趣的是JoinableQueue和PriorityQueue,因为我想要一个具有优先级的可连接队列。

看来我只能得到较低版本中的一个功能。例如,在3.5中,我可以与Queue(这是可连接的)和PriorityQueue进行区分,但在python 3.5以下有JoinableQueue和PriorityQueue(见下面的例子)。

是否有可能将它们结合起来以获得3.4中可连接的PriorityQueue的一般方法?

try:
    # Python 3.4.
    from asyncio import JoinableQueue as Queue # joinable
    from asyncio import PriorityQueue          # I assume this one is not joinable
except ImportError:
    # Python 3.5.
    from asyncio import Queue                  # standard joinable
    from asyncio import PriorityQueue          # I assume this is the one I want

另一种方法可能是影响Queue?

由于JoinableQueuePriorityQueue的实现方式,只要先列出JoinableQueue,就可以通过多重继承对两者进行子类化来获得JoinablePriorityQueue

这个工作的原因是PriorityQueue是非常简单的实现:

class PriorityQueue(Queue):
    """A subclass of Queue; retrieves entries in priority order (lowest first).
    Entries are typically tuples of the form: (priority number, data).
    """
    def _init(self, maxsize):
        self._queue = []
    def _put(self, item, heappush=heapq.heappush):
        heappush(self._queue, item)
    def _get(self, heappop=heapq.heappop):
        return heappop(self._queue)

虽然JoinableQueue更复杂,但它和PriorityQueue实现的唯一方法是_put,关键的是,JoinableQUeue在自己的put实现中调用super()._put(..),这意味着它将与PriorityQueue正常合作。

下面是演示它工作的示例:

from asyncio import PriorityQueue, JoinableQueue
import asyncio
import random
class JoinablePriorityQueue(JoinableQueue, PriorityQueue):
    pass

@asyncio.coroutine
def consume(q):
    while True:
        a = yield from q.get()
        print("got a {}".format(a))
        if a[1] is None:
            q.task_done()
            return
        asyncio.sleep(1)
        q.task_done()
@asyncio.coroutine
def produce(q):
    for i in range(10):
        yield from q.put((random.randint(0,10), i))
    yield from q.put((100, None)) # Will be last
    asyncio.async(consume(q))
    print("waiting...")
    yield from q.join()
    print("waited")
loop = asyncio.get_event_loop()
q = JoinablePriorityQueue()
loop.run_until_complete(produce(q))
输出:

waiting...
got a (1, 2)
got a (2, 1)
got a (4, 4)
got a (5, 0)
got a (6, 8)
got a (6, 9)
got a (8, 3)
got a (9, 5)
got a (9, 7)
got a (10, 6)
got a (100, None)
waited

最新更新