如何在 Python 中从多个线程使用 Queue.get((?
我想做的是:一个线程使用 Queue.put(xxx( 发送数据,并且一些线程获得相同的数据。这个想法就像">信号"。我想在没有 PyQt 的情况下做到这一点。
例如:
#!/usr/bin/python
import threading
import Queue
queue= Queue.Queue()
def Func1():
while True:
data= queue.get()
print 'Func1:got',data
if data=='q': break
def Func2():
while True:
data= queue.get()
print 'Func2:got',data
if data=='q': break
def MainThread():
while True:
data= raw_input('q to quit > ')
queue.put(data)
if data=='q': break
t1= threading.Thread(name='func1', target=Func1)
t2= threading.Thread(name='func2', target=Func2)
tm= threading.Thread(name='main', target=MainThread)
t1.start()
t2.start()
tm.start()
t1.join()
t2.join()
tm.join()
在这里,我希望 Func1 和 Func2 从 MainThread 获取相同的数据,但只有 Func1 和 Func2 中的一个可以获取数据。
如果你有一个好主意,请告诉我。
非常感谢!
编辑于 2014-12-19 12:51 EST
基于Reut Sharabani的想法,我实现了一个信号类。
#!/usr/bin/python
import threading
import Queue
class TSignal:
def __init__(self):
self.queues= {} #Map from index to queue
self.counter= 0
self.locker= threading.Lock()
def NewQueue(self):
with self.locker:
idx= self.counter
self.counter+= 1
self.queues[idx]= Queue.Queue()
queue= self.TQueue(self,idx,self.queues[idx])
return queue
def DeleteQueue(self,idx):
with self.locker:
del self.queues[idx]
def put(self,item,block=True,timeout=None):
for idx,queue in self.queues.iteritems():
queue.put(item,block,timeout)
class TQueue:
def __init__(self,parent,idx,queue):
self.parent= parent
self.idx= idx
self.queue= queue
def __enter__(self):
return self
def __exit__(self,e_type,e_value,e_traceback):
self.parent.DeleteQueue(self.idx)
def get(self,block=True,timeout=None):
return self.queue.get(block,timeout)
signal= TSignal()
def Func1():
with signal.NewQueue() as queue:
while True:
data= queue.get()
print 'nFunc1:got[%r]n'%data
if data=='q': break
def Func2():
with signal.NewQueue() as queue:
while True:
data= queue.get()
print 'nFunc2:got[%r]n'%data
if data=='q': break
def MainThread():
while True:
data= raw_input('q to quit > ')
signal.put(data)
if data=='q': break
t1= threading.Thread(name='func1', target=Func1)
t2= threading.Thread(name='func2', target=Func2)
tm= threading.Thread(name='main', target=MainThread)
t1.start()
t2.start()
tm.start()
t1.join()
t2.join()
tm.join()
Tignal的用法非常简单。 在 getter 函数中,插入一个 with 语句,如下所示:
with signal.NewQueue() as queue:
然后以与 Queue.get(( 相同的方式使用 queue:
data= queue.get()
在推杆函数中,只需使用 put 与 Queue.put(( 相同:
signal.put(data)
问题是,如果线程数为 N,则 TSignal 需要维护 N 个队列,而 TSignal.put 实际上调用 Queue.put N 次。所以我仍然想知道是否有更好的主意。
你对此有什么看法吗?
每个线程可以使用队列吗?如果是这样,您可以简单地使用自己的队列发布到每个线程:
#!/usr/bin/python
import threading
import Queue
queue1 = Queue.Queue()
queue2 = Queue.Queue()
def func1():
while True:
data = queue1.get()
print 'Func1:got', data
if data == 'q':
break
def func2():
while True:
data = queue2.get()
print 'Func2:got', data
if data == 'q':
break
def main():
while True:
data = raw_input('q to quit > ')
queue1.put(data)
queue2.put(data)
if data == 'q':
break
t1 = threading.Thread(name='func1', target=func1)
t2 = threading.Thread(name='func2', target=func2)
tm = threading.Thread(name='main', target=main)
t1.start()
t2.start()
tm.start()
t1.join()
t2.join()
tm.join()
编辑:
对于您在评论中的后续问题,这里有一种具有固定数量的同步原语的机制。这个想法是使用函数和消息创建任务,并将它们提交到线程池中执行。(注意:python 3 有障碍,如果你选择另一个实现,这里可能会变得很方便(:
#!/usr/bin/python
import threading
import Queue
from multiprocessing.pool import ThreadPool
MAX_THREADS = 10
publish_queue = Queue.Queue()
print_lock = threading.Lock()
def sync_print(msg):
print_lock.acquire()
print msg
print_lock.release()
# the manager actually holds a pool of threads
# he gives tasks to. The tasks are the functions you mean
# to execute zipped with the message.
def manager(functions):
pool = ThreadPool(min(len(functions), MAX_THREADS))
while True:
sync_print("Manager waiting for message")
message = publish_queue.get()
sync_print("Manager got message %s" % message)
if message == 'q':
pool.close()
pool.terminate()
break;
else:
# create tasks of form: (function, message)
tasks = zip(functions, [message] * len(functions))
pool.map(lambda x: x[0](x[1]), tasks)
def func1(data):
sync_print('%s:got %s' % (threading.current_thread().name, data))
def func2(data):
sync_print('%s:got %s' % (threading.current_thread().name, data))
def main():
while True:
data = raw_input('q to quit > ')
# wait for all threads to consume
publish_queue.put(data)
if data == 'q':
break
# the functions you want to execute on each message - these were your threads
functions = [
func1,
func2
]
main = threading.Thread(name='main', target=main)
manager = threading.Thread(name='manager', target=manager, args=(functions, ))
manager.start()
main.start()
main.join()
希望这适合您的情况,因为它可能会阻塞大量处理时间。