入队和出队的队列时间复杂性的有效实现



我目前正在阅读一本关于数据结构/算法的教科书。其中一个练习是使用python列表结构实现一个高效的队列:入队和出队的时间复杂度平均需要为O(1)。这本书说,对于出列的特定情况,时间复杂性应该只为O(n),其余时间应该为O(1)。我实现了它,使得队列的后面是列表的末尾,队列的前面是列表的开头;当我将一个元素出列时,我不会将其从列表中删除,但我只是增加一个计数器,这样该方法就会知道列表中的哪个元素代表队列的前面。这是我的代码:

class FasterQueue:
def __init__(self):
self.items = []
self.index = 0
def enqueue(self, item):
self.items.append(item)
def dequeue(self):
index = self.index
self.index += 1
return self.items[index]
def isEmpty(self):
return self.items == []
def size(self):
return len(self.items)

我的问题是:书中说,在某些情况下,出列应该取O(1)。我不知道这是什么情况,因为出列似乎总是在某个索引处获得值。我的队列实现是无效的,还是缺少其他东西?还是教科书只是在寻找另一种更常见的实现方式?

非常感谢你的帮助。

如果要使用更多Python风格的功能,我会做一些类似的事情:

class FasterQueue:
def __init__(self):
self.items = []
self.index = 0
def enqueue(self, item):
self.items.append(item)
def dequeue(self):
# do this first so IndexErrors don't cause us ignore items
obj = self.items[self.index]
# release the reference so we don't "leak" memory
self.items[self.index] = None
self.index += 1
return obj
def isEmpty(self):
return self.index == len(self.items)
def size(self):
return len(self.items)
def try_shrink(self):
nactive = len(self.items) - self.index
if nactive + 2 < self.index // 2:
self.items = self.items[self.index:]
self.index = 0

添加了一个try_shrink方法,该方法试图释放已使用的内存空间,在dequeue()结束时调用它可能会很有用——否则列表将任意增长,并浪费大量内存。其中的常数并不惊人,但应该可以防止它过于频繁地收缩。这个操作将是O(n),可能是所暗示的

根据我的理解,入队应该在最后插入,出队应该从开始删除。所以代码应该是

class FasterQueue:
def __init__(self):
self.items = []
def enqueue(self, item):
self.items.append(item)
def dequeue(self):
if self.items:
return self.items.pop(0)
print("Underflow")
def isEmpty(self):
return self.items == []
def size(self):
return len(self.items)

O(n)是管理列表长度的必要结果。

这里有一个解决方案。通常,它以O(1)的形式运行,并且由于出列方法内部发生的额外步骤,它不时地是O(n)。

当列表增长过大并触发清理时,会出现O(n)步骤。请注意,通常情况下,这应该在出列方法内部进行。在外面做,往往会更精细,效率更低。

class FasterQueue:
def __init__(self, maxwaste=100):
self.items = []
self.nout = 0
self.maxwaste = maxwaste
def enqueue(self, item):
self.items.append(item)
def dequeue(self):
if len(self.items):
retv = self.items[self.nout]
self.nout += 1
if self.nout >= self.maxwaste:
self.items = self.items[self.nout:]
self.nout =0
return retv
else:
print( 'empty' )
raise ValueError
def listQ(self):
return ' '.join( self.items[self.nout:] )
def isEmpty(self):
return self.nout == len(self.items)
def size(self):
return len(self.items) - self.nout
q = FasterQueue(5)
for n in range(10):
q.enqueue( str(n) )
print( 'queue size %d  nout %d items %s'%(q.size(),q.nout,q.listQ()) )
print( q.items )
while True:
try:
print( 'dequeue %s'%q.dequeue() )
print( 'queue size %d  nout %d items %s'%(q.size(),q.nout,q.listQ()) )
print( q.items )
except:
print( 'empty' )
break

运行上面的代码会产生以下输出,注意当超过maxwaste时浪费的内存的恢复情况。Maxwaste在这里设置得很小,用于演示操作。

queue size 10  nout 0 items 0 1 2 3 4 5 6 7 8 9
['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
dequeue 0
queue size 9  nout 1 items 1 2 3 4 5 6 7 8 9
['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
dequeue 1
queue size 8  nout 2 items 2 3 4 5 6 7 8 9
['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
dequeue 2
queue size 7  nout 3 items 3 4 5 6 7 8 9
['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
dequeue 3
queue size 6  nout 4 items 4 5 6 7 8 9
['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']
dequeue 4
queue size 5  nout 0 items 5 6 7 8 9
['5', '6', '7', '8', '9']
dequeue 5
queue size 4  nout 1 items 6 7 8 9
['5', '6', '7', '8', '9']
dequeue 6
queue size 3  nout 2 items 7 8 9
['5', '6', '7', '8', '9']
dequeue 7
queue size 2  nout 3 items 8 9
['5', '6', '7', '8', '9']
dequeue 8
queue size 1  nout 4 items 9
['5', '6', '7', '8', '9']
dequeue 9
queue size 0  nout 0 items 
[]
empty
empty

为了完整性,这里有一个使用环形缓冲区的答案。

这个例子永远以O(1)的形式运行,但它通过限制队列的大小来实现这一点。如果您希望允许队列动态增长或大小,那么您也可以使用O(n)行为。

换句话说,只有当您没有以某种方式管理列表的大小时,才有O(1)。这就是问题的重点。

好的,这是具有固定队列长度的环形缓冲区实现。

class FasterQueue:
def __init__(self, nsize=100):
self.items = [0]*nsize
self.nin = 0
self.nout = 0
self.nsize = nsize
def enqueue(self, item):
next = (self.nin+1)%self.nsize
if next != self.nout:
self.items[self.nin] = item
self.nin = next
print self.nin, item
else:
raise ValueError
def dequeue(self):
if self.nout != self.nin:
retv = self.items[self.nout]
self.nout = (self.nout+1)%self.nsize
return retv
else:
raise ValueError
def printQ(self):
if self.nout < self.nin:
print( ' '.join(self.items[self.nout:self.nin]) )
elif self.nout > self.nin:
print( ' '.join(self.items[self.nout:]+self.items[:self.nin]) )
def isEmpty(self):
return self.nin == self.nout
def size(self):
return (self.nin - self.nout + self.nsize)%self.nsize
q = FasterQueue()
q.enqueue( 'a' )
q.enqueue( 'b' )
q.enqueue( 'c' )
print( 'queue items' )
q.printQ()
print( 'size %d'%q.size() )

while True:
try:
print( 'dequeue %s'%q.dequeue() )
print( 'queue items' )
q.printQ()
except:
print( 'empty' )
break

最新更新