我正试图使用cherrypy+多处理(启动工作进程)+gevent(从工作进程内启动并行I/o Greenlet)的组合。似乎最简单的方法是monkeypatch多处理,因为greenlets只能在主应用程序进程中操作。
然而,看起来monkey补丁适用于多处理的某些部分,而不适用于其他部分。这是我的CherryPy服务器示例:
from gevent import monkey
monkey.patch_all()
import gevent
import cherrypy
import multiprocessing
def launch_testfuncs():
jobs = [gevent.spawn(testfunc)
for i in range(0, 12)]
gevent.joinall(jobs, timeout=10)
def testfunc():
print 'testing'
class HelloWorld(object):
def index(self):
launch_testfuncs()
return "Hello World!"
index.exposed = True
def index_proc(self):
proc = multiprocessing.Process(target=launch_testfuncs)
proc.start()
proc.join()
return "Hello World 2!"
index_proc.exposed = True
def index_pool(self):
pool = multiprocessing.Pool(1)
return "Hello World 3!"
index_pool.exposed = True
def index_namespace(self):
manager = multiprocessing.Manager()
anamespace = manager.Namespace()
anamespace.val = 23
return "Hello World 4!"
index_namespace.exposed = True
cherrypy.quickstart(HelloWorld())
猴子补丁后的以下工作:
index
-只是直接从cherrypy类中生成greenletsindex_proc
-使用multiprocessing.Process
启动一个新进程,然后从该进程生成greenlet
以下有问题:
index_pool
-启动multiprocessing.Pool
-挂起且从不返回index_namespace
-初始化multiprocessing.Manager
命名空间以管理工作线程池/集合中的共享内存-返回以下错误消息:[15/Nov/2012:17:19:31] HTTP Traceback (most recent call last): File "/Library/Python/2.7/site-packages/cherrypy/_cprequest.py", line 656, in respond response.body = self.handler() File "/Library/Python/2.7/site-packages/cherrypy/lib/encoding.py", line 188, in __call__ self.body = self.oldhandler(*args, **kwargs) File "/Library/Python/2.7/site-packages/cherrypy/_cpdispatch.py", line 34, in __call__ return self.callable(*self.args, **self.kwargs) File "server.py", line 39, in index_namespace anamespace = manager.Namespace() File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/managers.py", line 667, in temp token, exp = self._create(typeid, *args, **kwds) File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/managers.py", line 565, in _create conn = self._Client(self._address, authkey=self._authkey) File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/connection.py", line 175, in Client answer_challenge(c, authkey) File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/connection.py", line 414, in answer_challenge response = connection.recv_bytes(256) # reject large message IOError: [Errno 35] Resource temporarily unavailable
我试图在gevent文档中找到一些与此相关的文档,但找不到任何与此有关的内容。只是gevent的猴子补丁不完整吗?其他人也有类似的问题吗?有办法解决吗?
问题似乎是gevent.socket
非阻塞的结果,这意味着如果X
字节不能立即在套接字上可用,则任何socket.recv_bytes(X)
调用都会抛出该错误。具体地说,gevent.socket
被设计为永远不会阻塞套接字。
multiprocessing
的问题是因为它使用了stdlibsocket
模块,并期望它被阻塞,而在您使用了monkey.patch_all()
'd之后,socket
模块已被替换,并且multiprocessing.connection
不是为处理新的异步行为而设计的。
您可以告诉monkey
不要修补socket
,但这意味着任何在应用程序中利用异步套接字的东西都可能因此而导致一些性能损失。
要执行此操作,请使用socket=False
:patch_all(socket=False)
调用patch_all
。
这不是一个理想的解决方案,因为您几乎失去了从最初使用gevent
中获得的大部分好处。
我遇到过和你一样的问题。我的解决方案是,我刚刚使用了多处理。Process()生成固定数量的进程。最后加入所有,等待完成。
#!/usr/bin/env python
# encoding: utf-8
from gevent import monkey
monkey.patch_all()
import gevent
import multiprocessing as mp
NUM = 10
def work(i):
jobs = [gevent.spawn(func, i)
for i in range(0, 12)]
gevent.joinall(jobs)
print "{} Done {}".format(mp.current_process().name, i)
def func(x):
print "Gevent: {}".format(x)
def main():
processes = [mp.Process(name="Process-{}".format(i), target=work, args=(i,)) for i in xrange(NUM)]
for process in processes:
process.start()
for process in processes:
process.join()
if __name__ == '__main__':
main()
输出
Gevent: 0
Gevent: 1
Gevent: 2
Gevent: 3
Gevent: 4
Gevent: 5
Gevent: 6
Gevent: 7
Gevent: 8
Gevent: 9
Gevent: 10
Gevent: 11
Process-0 Done 11
Gevent: 0
Gevent: 1
Gevent: 2
Gevent: 3
Gevent: 4
Gevent: 5
Gevent: 6
Gevent: 7
Gevent: 8
Gevent: 9
Gevent: 10
Gevent: 11
Process-1 Done 11
Gevent: 0
Gevent: 1
Gevent: 2
Gevent: 3
Gevent: 4
Gevent: 5
Gevent: 6
Gevent: 7
Gevent: 8
Gevent: 9
Gevent: 10
Gevent: 11
Process-2 Done 11
Gevent: 0
... ...
这就是gevent使用多处理的解决方案。