我正试图使用一组计算机来运行数百万个小型模拟。为了做到这一点,我试图建立两个";服务器";在我的主计算机上,一个用于将队列中的输入变量添加到网络,另一个用于处理结果。
这是将东西放入模拟变量队列的代码:
"""This script reads start parameters and calls on run_sim to run the
simulations"""
import time
from multiprocessing import Process, freeze_support, Manager, Value, Queue, current_process
from multiprocessing.managers import BaseManager
class QueueManager(BaseManager):
pass
class MultiComputers(Process):
def __init__(self, sim_name, queue):
self.sim_name = sim_name
self.queue = queue
super(MultiComputers, self).__init__()
def get_sim_obj(self, offset, db):
"""returns a list of lists from a database query"""
def handle_queue(self):
self.sim_nr = 0
sims = self.get_sim_obj()
self.total = len(sims)
while len(sims) > 0:
if self.queue.qsize() > 100:
self.queue.put(sims[0])
self.sim_nr += 1
print(self.sim_nr, round(self.sim_nr/self.total * 100, 2), self.queue.qsize())
del sims[0]
def run(self):
self.handle_queue()
if __name__ == '__main__':
freeze_support()
queue = Queue()
w = MultiComputers('seed_1_hundred', queue)
w.start()
QueueManager.register('get_queue', callable=lambda: queue)
m = QueueManager(address=('', 8001), authkey=b'abracadabra')
s = m.get_server()
s.serve_forever()
然后运行这个队列来处理模拟结果:
__author__ = 'axa'
from multiprocessing import Process, freeze_support, Queue
from multiprocessing.managers import BaseManager
import time
class QueueManager(BaseManager):
pass
class SaveFromMultiComp(Process):
def __init__(self, sim_name, queue):
self.sim_name = sim_name
self.queue = queue
super(SaveFromMultiComp, self).__init__()
def run(self):
res_got = 0
with open('sim_type1_' + self.sim_name, 'a') as f_1:
with open('sim_type2_' + self.sim_name, 'a') as f_2:
while True:
if self.queue.qsize() > 0:
while self.queue.qsize() > 0:
res = self.queue.get()
res_got += 1
if res[0] == 1:
f_1.write(str(res[1]) + 'n')
elif res[0] == 2:
f_2.write(str(res[1]) + 'n')
print(res_got)
time.sleep(0.5)
if __name__ == '__main__':
queue = Queue()
w = SaveFromMultiComp('seed_1_hundred', queue)
w.start()
m = QueueManager(address=('', 8002), authkey=b'abracadabra')
s = m.get_server()
s.serve_forever()
这些脚本按预期工作,用于处理前7-800次模拟,之后我在终端运行接收结果脚本时出现以下错误:
Exception in thread Thread-1:
Traceback (most recent call last):
File "C:Python35libthreading.py", line 914, in _bootstrap_inner
self.run()
File "C:Python35libthreading.py", line 862, in run
self._target(*self._args, **self._kwargs)
File "C:Python35libmultiprocessingmanagers.py", line 177, in accepter
t.start()
File "C:Python35libthreading.py", line 844, in start
_start_new_thread(self._bootstrap, ())
RuntimeError: can't start new thread
有人能深入了解线程在哪里以及如何生成吗?每次调用queue.get()
时都会生成一个新线程吗?或者它是如何工作的?如果有人知道我能做些什么来避免这次失败,我会很高兴?(我正在使用Python3.5-32运行脚本(
所有迹象都表明系统没有启动线程所需的资源(可能是内存,但可能是线程或其他资源泄漏(。您可以使用操作系统监控工具(top
适用于Linux,Resource Monitor
适用于windows(来查看线程数量和内存使用情况,以跟踪这一情况,但我建议您只使用更简单、更高效的编程模式。
虽然这不是一个完美的比较,但您通常会看到C10K问题,它表明等待结果的阻塞线程不能很好地扩展,并且可能容易出现这样的泄漏错误。解决方案是实现异步IO模式(一个启动其他工作程序的阻塞线程(,这在Web服务器中是非常直接的。
像pythonsaiohttp
这样的框架应该非常适合您的需求。您只需要一个可以获取远程代码ID和结果的处理程序。希望该框架能够为您处理扩展问题。
因此,在您的情况下,您可以保留启动代码,但在远程机器上启动进程后,终止线程。让远程代码向服务器发送一条HTTP消息,其中包含1(其ID和2(其结果。如果它没有得到200的"OK"状态代码,那么添加一点额外的代码让它再试一次,你的状态应该会好得多。
我认为您的系统必须运行许多线程。我会先检查你的系统资源,然后重新思考我的程序。试着限制你的线程并尽可能少地使用。