Python:来自多个线程(或信号)的 Queue.get()




如何在 Python 中从多个线程使用 Queue.get((

我想做的是:一个线程使用 Queue.put(xxx( 发送数据,并且一些线程获得相同的数据。这个想法就像">信号"。我想在没有 PyQt 的情况下做到这一点。

例如:

#!/usr/bin/python
import threading
import Queue
queue= Queue.Queue()
def Func1():
  while True:
    data= queue.get()
    print 'Func1:got',data
    if data=='q':  break
def Func2():
  while True:
    data= queue.get()
    print 'Func2:got',data
    if data=='q':  break
def MainThread():
  while True:
    data= raw_input('q to quit > ')
    queue.put(data)
    if data=='q':  break
t1= threading.Thread(name='func1', target=Func1)
t2= threading.Thread(name='func2', target=Func2)
tm= threading.Thread(name='main', target=MainThread)
t1.start()
t2.start()
tm.start()
t1.join()
t2.join()
tm.join()

在这里,我希望 Func1 和 Func2 从 MainThread 获取相同的数据,但只有 Func1 和 Func2 中的一个可以获取数据。

如果你有一个好主意,请告诉我。

非常感谢!


编辑于 2014-12-19 12:51 EST

基于Reut Sharabani的想法,我实现了一个信号类。

#!/usr/bin/python
import threading
import Queue
class TSignal:
  def __init__(self):
    self.queues= {}  #Map from index to queue
    self.counter= 0
    self.locker= threading.Lock()
  def NewQueue(self):
    with self.locker:
      idx= self.counter
      self.counter+= 1
      self.queues[idx]= Queue.Queue()
    queue= self.TQueue(self,idx,self.queues[idx])
    return queue
  def DeleteQueue(self,idx):
    with self.locker:
      del self.queues[idx]
  def put(self,item,block=True,timeout=None):
    for idx,queue in self.queues.iteritems():
      queue.put(item,block,timeout)
  class TQueue:
    def __init__(self,parent,idx,queue):
      self.parent= parent
      self.idx= idx
      self.queue= queue
    def __enter__(self):
      return self
    def __exit__(self,e_type,e_value,e_traceback):
      self.parent.DeleteQueue(self.idx)
    def get(self,block=True,timeout=None):
      return self.queue.get(block,timeout)
signal= TSignal()
def Func1():
  with signal.NewQueue() as queue:
    while True:
      data= queue.get()
      print 'nFunc1:got[%r]n'%data
      if data=='q':  break
def Func2():
  with signal.NewQueue() as queue:
    while True:
      data= queue.get()
      print 'nFunc2:got[%r]n'%data
      if data=='q':  break
def MainThread():
  while True:
    data= raw_input('q to quit > ')
    signal.put(data)
    if data=='q':  break
t1= threading.Thread(name='func1', target=Func1)
t2= threading.Thread(name='func2', target=Func2)
tm= threading.Thread(name='main', target=MainThread)
t1.start()
t2.start()
tm.start()
t1.join()
t2.join()
tm.join()

Tignal的用法非常简单。 在 getter 函数中,插入一个 with 语句,如下所示:

with signal.NewQueue() as queue:

然后以与 Queue.get(( 相同的方式使用 queue:

data= queue.get()

在推杆函数中,只需使用 put 与 Queue.put(( 相同:

signal.put(data)

问题是,如果线程数为 N,则 TSignal 需要维护 N 个队列,而 TSignal.put 实际上调用 Queue.put N 次。所以我仍然想知道是否有更好的主意。

你对此有什么看法吗?

每个线程可以使用队列吗?如果是这样,您可以简单地使用自己的队列发布到每个线程:

#!/usr/bin/python
import threading
import Queue
queue1 = Queue.Queue()
queue2 = Queue.Queue()

def func1():
    while True:
        data = queue1.get()
        print 'Func1:got', data
        if data == 'q':
            break

def func2():
    while True:
        data = queue2.get()
        print 'Func2:got', data
        if data == 'q':
            break

def main():
    while True:
        data = raw_input('q to quit > ')
        queue1.put(data)
        queue2.put(data)
        if data == 'q':
            break

t1 = threading.Thread(name='func1', target=func1)
t2 = threading.Thread(name='func2', target=func2)
tm = threading.Thread(name='main', target=main)
t1.start()
t2.start()
tm.start()
t1.join()
t2.join()
tm.join()

编辑:

对于您在评论中的后续问题,这里有一种具有固定数量的同步原语的机制。这个想法是使用函数和消息创建任务,并将它们提交到线程池中执行。(注意:python 3 有障碍,如果你选择另一个实现,这里可能会变得很方便(:

#!/usr/bin/python
import threading
import Queue
from multiprocessing.pool import ThreadPool
MAX_THREADS = 10
publish_queue = Queue.Queue()
print_lock = threading.Lock()

def sync_print(msg):
    print_lock.acquire()
    print msg
    print_lock.release()
# the manager actually holds a pool of threads
# he gives tasks to. The tasks are the functions you mean
# to execute zipped with the message.
def manager(functions):
    pool = ThreadPool(min(len(functions), MAX_THREADS))
    while True:
        sync_print("Manager waiting for message")
        message = publish_queue.get()
        sync_print("Manager got message %s" % message)
        if message == 'q':
            pool.close()
            pool.terminate()
            break;
        else:
            # create tasks of form: (function, message)
            tasks = zip(functions, [message] * len(functions))
            pool.map(lambda x: x[0](x[1]), tasks)

def func1(data):
    sync_print('%s:got %s' % (threading.current_thread().name, data))

def func2(data):
    sync_print('%s:got %s' % (threading.current_thread().name, data))

def main():
    while True:
        data = raw_input('q to quit > ')
        # wait for all threads to consume
        publish_queue.put(data)
        if data == 'q':
            break
# the functions you want to execute on each message - these were your threads
functions = [
    func1,
    func2
]
main = threading.Thread(name='main', target=main)
manager = threading.Thread(name='manager', target=manager, args=(functions, ))
manager.start()
main.start()
main.join()

希望这适合您的情况,因为它可能会阻塞大量处理时间。

相关内容

  • 没有找到相关文章