我正在尝试使用python中的队列,这将是多线程的。我只是想知道我使用的方法是否正确。如果我在做一些多余的事情,或者我应该使用更好的方法。
我正在尝试从表中获取新的请求,并使用一些逻辑来调度它们来执行一些操作,如运行查询。
所以在主线程中,我为队列生成了一个单独的线程。
if __name__=='__main__':
request_queue = SetQueue(maxsize=-1)
worker = Thread(target=request_queue.process_queue)
worker.setDaemon(True)
worker.start()
while True:
try:
#Connect to the database get all the new requests to be verified
db = Database(username_testschema, password_testschema, mother_host_testschema, mother_port_testschema, mother_sid_testschema, 0)
#Get new requests for verification
verify_these = db.query("SELECT JOB_ID FROM %s.table WHERE JOB_STATUS='%s' ORDER BY JOB_ID" %
(username_testschema, 'INITIATED'))
#If there are some requests to be verified, put them in the queue.
if len(verify_these) > 0:
for row in verify_these:
print "verifying : %s" % row[0]
verify_id = row[0]
request_queue.put(verify_id)
except Exception as e:
logger.exception(e)
finally:
time.sleep(10)
现在,在Setqueue类中,我有一个process_queue函数,用于处理每次运行中添加到队列中的前2个请求。
'''
Overridding the Queue class to use set as all_items instead of list to ensure unique items added and processed all the time,
'''
class SetQueue(Queue.Queue):
def _init(self, maxsize):
Queue.Queue._init(self, maxsize)
self.all_items = set()
def _put(self, item):
if item not in self.all_items:
Queue.Queue._put(self, item)
self.all_items.add(item)
'''
The Multi threaded queue for verification process. Take the top two items, verifies them in a separate thread and sleeps for 10 sec.
This way max two requests per run will be processed.
'''
def process_queue(self):
while True:
scheduler_obj = Scheduler()
try:
if self.qsize() > 0:
for i in range(2):
job_id = self.get()
t = Thread(target=scheduler_obj.verify_func, args=(job_id,))
t.start()
for i in range(2):
t.join(timeout=1)
self.task_done()
except Exception as e:
logger.exception(
"QUEUE EXCEPTION : Exception occured while processing requests in the VERIFICATION QUEUE")
finally:
time.sleep(10)
我想看看我的理解是否正确,是否有任何问题。
所以主线程运行,而True在主线程连接到数据库得到新的请求,并把它放在队列中。队列的工作线程(守护进程)继续从队列中获取新请求,并分叉非守护进程线程进行处理,由于join的超时为1,工作线程将继续接受新请求而不会被阻塞,它的子线程将继续在后台处理。正确吗?
因此,如果主进程退出,这些不会被杀死,直到他们完成他们的工作,但工作守护线程将退出。疑问句:如果父进程是守护进程,子进程不是守护进程,如果父进程退出,子进程是否退出?
我也读到这里:- David beazley multiprocessing
由david beazley在使用池作为线程协处理器部分,他试图解决一个类似的问题。所以我应该跟随他的脚步吗1. 创建一个进程池。2. 打开一个线程,就像我为request_queue所做的那样3.在那个线程中
def process_verification_queue(self):
while True:
try:
if self.qsize() > 0:
job_id = self.get()
pool.apply_async(Scheduler.verify_func, args=(job_id,))
except Exception as e:
logger.exception("QUEUE EXCEPTION : Exception occured while processing requests in the VERIFICATION QUEUE")
从池中使用一个进程并并行运行verify_func。这会给我更多的表现吗?
虽然可以为队列创建一个新的独立线程,并以您正在做的方式单独处理该数据,但我认为每个独立的工作线程将消息发布到他们已经"知道"的队列中更为常见。有关。然后,其他线程通过从该队列中提取消息来处理该队列。
设计理念
我设想的应用程序应该是三个线程。主线程和两个工作线程。1个工作线程将从数据库获取请求并将其放入队列中。另一个工作线程将处理队列
中的数据。主线程将通过使用线程函数。join()等待其他线程结束
你可以保护线程可以访问的队列,并通过使用互斥锁使其线程安全。我在其他语言的许多其他设计中也看到过这种模式。
推荐阅读
"有效Python"关于这个问题,布莱特·斯拉特金有一个很好的例子。
他没有从Queue继承,而是在他的类中为Queue创建了一个包装器调用MyQueue,并添加get()和put(message)函数。
他甚至在他的Github repo中提供了源代码
https://github.com/bslatkin/effectivepython/blob/master/example_code/item_39.py我不是这本书的作者,但我强烈推荐这本书,因为我从这本书中学到了很多东西。
我喜欢这个对优点的解释& &;使用线程和进程的区别"…但也有一线希望:进程可以同时在多个执行线程上取得进展。由于父进程不与其子进程共享GIL,因此所有进程都可以同时执行(受硬件和操作系统的约束)...."
他有一些很好的解释如何绕过GIL和如何提高性能
阅读更多:
http://jeffknupp.com/blog/2013/06/30/pythons-hardest-problem-revisited/