看,各位。我们有一个关于gevent.pool类和pool.wait_available()方法的问题,这两个代码段都是
1.
def fetch(url):
print 'start fetching...', url
data = urllib2.urlopen(url)
print url,':',data.code
urls = ['http://www.google.ru', 'http://www.s-str.ru', 'http://www.vk.com', 'http://www.yandex.ru', 'http://www.xxx.com']
pool = Pool(2)
def producer():
for url in urls:
pool.spawn(fetch, url)
pool.join()
p = gevent.spawn(producer)
p.join()
2.
def fetch(url):
print 'start fetching...', url
data = urllib2.urlopen(url)
print url,':',data.code
urls = ['http://www.google.ru', 'http://www.s-str.ru', 'http://www.vk.com', 'http://www.yandex.ru', 'http://www.xxx.com']
pool = Pool(2)
def producer():
for url in urls:
pool.wait_available()
pool.spawn(fetch, url)
pool.join()
p = gevent.spawn(producer)
p.join()
给我们类似的结果:
start fetching... http://www.google.ru
start fetching... http://www.s-str.ru
http://www.google.ru : 200
start fetching... http://www.vk.com
http://www.s-str.ru : 200
start fetching... http://www.yandex.ru
http://www.yandex.ru : 200
start fetching... http://www.xxx.com
http://www.vk.com : 200
http://www.xxx.com : 200
有人能解释wait_available()方法的含义吗?以及可能的使用情况。
======更新========我已经把它搞砸了,它工作正常,我只想知道这两个代码片段之间的区别。
TL;DR:如果使用派生,则不需要wait_available
,因为在两种方法中都运行相同的检查。但是,如果您正在使用apply_async
,并且不希望提交超过池上限的线程,那么您应该首先调用wait_available
。
为了更清楚的解释。。有几种方法可以通过gevent的Pool
类实现相同的功能。在池上使用spawn
将被阻止,直到Pool
中有可用空间来运行新的greenlet。这里有一个快速的例子:
import gevent.monkey
gevent.monkey.patch_all()
import gevent.pool
import time
def my_slow_function():
time.sleep(5)
def log(text):
print '%d : %s' % (int(time.time()), text)
if __name__ == '__main__':
thread_pool = gevent.pool.Pool(5)
for i in xrange(20):
log('Submitting slow func %d' % i)
thread_pool.spawn(my_slow_function)
thread_pool.join()
log('Exiting')
这个输出显示,它将以5个一组的方式生成这些,因为池中包含5个greenlets的空间:
1403037287:提交慢功能0
1403037297:提交慢速功能1
140 3037287∶提交慢速功能2
14 3037287>提交慢速功能3
40 3037287=提交慢速功能4
1 1403037292:提交慢速函数5
49
1403037297:提交慢功能10
140 3037297:提交慢功能11
14 3037297∶提交慢功能12
40 3037297=提交慢功能13
1400 3037297-提交慢功能14>140 3037302:提交慢功能15
400 3037302-提交慢函数16
2014 3037302∶提交慢函数17
4 3037302=提交慢函数18
19
1403037307:退出
正如你所看到的,它们以5个为一组的方式繁殖,大约间隔5秒。如果深入研究gevent代码并查看Pool对象,可以看到调用spawn
将要求锁定Pool
的内部信号量,该信号量用于跟踪运行的greenlet。
相反,如果您使用apply_async
而不是spawn
尝试相同的代码,它将强制所有调用同时运行:
1403037313:提交慢功能0
1403037323:提交慢速功能1
140 3037313∶提交慢速功能2
14 3037313>提交慢速功能3
40 3037313=提交慢速功能4
1 140303731 3:提交低速功能5
49
1403037313:提交慢功能10
140 3037313∶提交慢功能11
14 3037313>提交慢功能13
40 3037313=提交慢功能14
4 3037313-提交慢功能15
10 3037313~提交慢功能16
1 4037313∶报送慢功能17
2 140303731 3∶报送慢速功能18
19
1403037318:退出
您可以在这里看到,没有阻塞或等待,它们都是同时插入的。但是,如果在for循环开始时抛出wait_available()
,则返回到与spawn
类似的行为。
1403038292:提交慢功能0
140308292:提交慢功能1
11403038292:提交慢功能2
10403038292:-提交慢功能3
14003038292::提交慢功能4
1403038297:提交慢功能5
14043038297:-提交慢函数6
14023038297::提交慢函数7
14013038297:9
1403038302:提交慢功能10
140 3038302:提交慢功能11
14 3038302∶提交慢功能13
40 3038302=提交慢功能14
4 3038307:提交慢函数15
1 4038307:提交慢功能16
10 3038307∶提交慢函数17
2 1403038307=提交慢函数18
19
1403038312:退出
再次查看gevent中的源代码,wait_available
执行与调用spawn
相同的检查,即检查信号量以查看池中是否真的有空间。
使用gevent
之前需要修补标准模块。
>>> import gevent.monkey
>>> gevent.monkey.patch_all()
>>> ...
>>> p = gevent.spawn(producer)
>>> p.join()
start fetching... http://www.google.ru
start fetching... http://www.s-str.ru
http://www.google.ru : 200
start fetching... http://www.vk.com
http://www.vk.com : 200
start fetching... http://www.yandex.ru
http://www.yandex.ru : 200
start fetching... http://www.xxx.com
http://www.xxx.com : 200
http://www.s-str.ru : 200
您可以看到,pool.wait_available()
的工作是可预测的。
更新
Pool
仅对spawn
功能起相同的作用(它将等待池中可用的"插槽")。如果你需要提供基于Pool
状态的其他功能(日志记录、跟踪、监控),你肯定会使用wait_available
、free_count
等功能。如果你只需要spawn
新的绿色线程,你可以依赖Pool
实现。