在类 __init__ 中初始化多进程是否正确?


from multiprocessing.dummy import Pool as ThreadPool
class TSNew:
def __init__(self):
self.redis_client = redis.StrictRedis(host="172.17.31.147", port=4401, db=0)
self.global_switch = 0
self.pool = ThreadPool(40) # init pool
self.dnn_model = None
self.nnf = None
self.md5sum_nnf = "initialize"
self.thread = threading.Thread(target=self.load_model_item)
self.ts_picked_ids = None
self.thread.start()
self.memory = deque(maxlen=3000)
self.process = threading.Thread(target=self.process_user_dict)
self.process.start()
def load_model_item(self):
'''
code
'''
def predict_memcache(self,user_dict):
'''
code
'''
def process_user_dict(self):
while True:
'''
code to generate user_dicts which is a list 
'''
results = self.pool.map(self.predict_memcache, user_dicts)
'''
code
'''
TSNew_ = TSNew()
def get_user_result():
logging.info("----------------come in ------------------")
if request.method == 'POST':
user_dict_json = request.get_data()# userid
if user_dict_json == '' or user_dict_json is None:
logging.info("----------------user_dict_json is ''------------------")
return ''
try:
user_dict = json.loads(user_dict_json)
except:
logging.info("json load error, pass")
return ''
TSNew_.memory.append(user_dict)
logging.info('add to deque TSNew_.memory size: %d  PID: %d', len(TSNew_.memory), os.getpid())
logging.info("add to deque userid: %s, nation: %s n",user_dict['user_id'],  user_dict['user_country'])
return 'SUCCESSn'

@app.route('/', methods=['POST'])
def get_ts_gbdt_id():
return get_user_result()
from werkzeug.contrib.fixers import ProxyFix
app.wsgi_app = ProxyFix(app.wsgi_app)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=4444)

我在类__init__中创建了一个多线程池,并使用self.pool映射predict_memcache的功能。 我有两个疑问: (a( 我应该在__init__初始化池还是在之前初始化它

results = self.pool.map(self.predict_memcache, user_dicts)

(b( 由于池是多线程操作,并且在process_user_dict的线程中执行,那么是否存在任何隐藏错误? 谢谢。

问题 (a(:

这要看情况。如果需要多次运行process_user_dict,则在构造函数中启动池并保持其运行是有意义的。创建线程池总是会带来一些开销,通过在调用process_user_dict之间保持池活动状态,可以避免额外的开销。

如果您只想处理一组输入,您也可以直接在process_user_dict中创建池。但可能不会在results = self.pool.map(self.predict_memcache, user_dicts)之前,因为这将为周围while循环的每次迭代创建一个池。

在您的特定情况下,它没有任何区别。您可以在模块级别创建TSNew_对象,以便在应用运行时保持活动状态(以及线程池(;来自同一TSNew实例的同一线程池用于处理app.run()生存期内的所有请求。 由于您似乎将该结构与self.process = threading.Thread(target=self.process_user_dict)一起使用self.memory上的某种侦听器,因此在构造函数中创建池在功能上等效于在process_user_dict内(但在循环外部(创建池。

问题(b(:

从技术上讲,在线程内创建线程时,默认情况下没有隐藏错误。最后,任何附加线程的最终父线程始终是MainThread,它是为每个 Python 解释器实例隐式创建的。基本上,每次在 Python 程序中创建线程时,都会在线程中创建线程。

实际上,您的代码甚至不会在线程内创建线程。您的self.pool是在MainThread中创建的。当池通过self.pool = ThreadPool(40)实例化时,它会创建所需数量 (40( 的工作线程,以及一个工作线程线程、一个任务处理程序线程和一个结果处理程序线程。所有这些都是MainThread的子线程。在self.process下,您对线程中的池所做的只是调用其map方法来为其分配任务。

但是,我真的看不出你在这里用这个self.process做什么的意义。 猜测一下,我会说您想在process_user_dict中启动循环以充当self.memory上的侦听器,以便池在self.memory开始出现在deque中时立即开始处理user_dict。从我看到你在get_user_result所做的事情来看,你似乎每个请求都会得到一个user_dict。我知道您可能有并发用户会话传入这些字典,但是您是否真的看到process_user_dict在无限循环中运行而不是简单地在TSNew_.memory.append(user_dict)之后调用TSNew_.process_user_dict()?你甚至可以完全省略self.memory并将字典直接传递给process_user_dict,除非我错过了你没有向我们展示的东西。

相关内容

  • 没有找到相关文章

最新更新