geventpool.wait_available()方法的含义



看,各位。我们有一个关于gevent.pool类和pool.wait_available()方法的问题,这两个代码段都是

1.

def fetch(url):
print 'start fetching...', url
data = urllib2.urlopen(url)
print url,':',data.code
urls = ['http://www.google.ru', 'http://www.s-str.ru', 'http://www.vk.com', 'http://www.yandex.ru', 'http://www.xxx.com']
pool = Pool(2)
def producer():
for url in urls:
pool.spawn(fetch, url)
pool.join()
p = gevent.spawn(producer)
p.join()

2.

def fetch(url):
print 'start fetching...', url
data = urllib2.urlopen(url)
print url,':',data.code
urls = ['http://www.google.ru', 'http://www.s-str.ru', 'http://www.vk.com', 'http://www.yandex.ru', 'http://www.xxx.com']
pool = Pool(2)
def producer():
for url in urls:
pool.wait_available()
pool.spawn(fetch, url)
pool.join()
p = gevent.spawn(producer)
p.join()

给我们类似的结果:

start fetching... http://www.google.ru
start fetching... http://www.s-str.ru
http://www.google.ru : 200
start fetching... http://www.vk.com
http://www.s-str.ru : 200
start fetching... http://www.yandex.ru
http://www.yandex.ru : 200
start fetching... http://www.xxx.com
http://www.vk.com : 200
http://www.xxx.com : 200

有人能解释wait_available()方法的含义吗?以及可能的使用情况。

======更新========我已经把它搞砸了,它工作正常,我只想知道这两个代码片段之间的区别。

TL;DR:如果使用派生,则不需要wait_available,因为在两种方法中都运行相同的检查。但是,如果您正在使用apply_async,并且不希望提交超过池上限的线程,那么您应该首先调用wait_available


为了更清楚的解释。。有几种方法可以通过gevent的Pool类实现相同的功能。在池上使用spawn将被阻止,直到Pool中有可用空间来运行新的greenlet。这里有一个快速的例子:

import gevent.monkey
gevent.monkey.patch_all()
import gevent.pool
import time
def my_slow_function():
time.sleep(5)
def log(text):
print '%d : %s' % (int(time.time()), text)
if __name__ == '__main__':
thread_pool = gevent.pool.Pool(5)
for i in xrange(20):
log('Submitting slow func %d' % i)
thread_pool.spawn(my_slow_function)
thread_pool.join()
log('Exiting')

这个输出显示,它将以5个一组的方式生成这些,因为池中包含5个greenlets的空间:

1403037287:提交慢功能0
1403037297:提交慢速功能1
140 3037287∶提交慢速功能2
14 3037287>提交慢速功能3
40 3037287=提交慢速功能4
1 1403037292:提交慢速函数5
49
1403037297:提交慢功能10
140 3037297:提交慢功能11
14 3037297∶提交慢功能12
40 3037297=提交慢功能13
1400 3037297-提交慢功能14>140 3037302:提交慢功能15
400 3037302-提交慢函数16
2014 3037302∶提交慢函数17
4 3037302=提交慢函数18
19
1403037307:退出

正如你所看到的,它们以5个为一组的方式繁殖,大约间隔5秒。如果深入研究gevent代码并查看Pool对象,可以看到调用spawn将要求锁定Pool的内部信号量,该信号量用于跟踪运行的greenlet。

相反,如果您使用apply_async而不是spawn尝试相同的代码,它将强制所有调用同时运行:

1403037313:提交慢功能0
1403037323:提交慢速功能1
140 3037313∶提交慢速功能2
14 3037313>提交慢速功能3
40 3037313=提交慢速功能4
1 140303731 3:提交低速功能5
49
1403037313:提交慢功能10
140 3037313∶提交慢功能11
14 3037313>提交慢功能13
40 3037313=提交慢功能14
4 3037313-提交慢功能15
10 3037313~提交慢功能16
1 4037313∶报送慢功能17
2 140303731 3∶报送慢速功能18
19
1403037318:退出

您可以在这里看到,没有阻塞或等待,它们都是同时插入的。但是,如果在for循环开始时抛出wait_available(),则返回到与spawn类似的行为。

1403038292:提交慢功能0
140308292:提交慢功能1
11403038292:提交慢功能2
10403038292:-提交慢功能3
14003038292::提交慢功能4
1403038297:提交慢功能5
14043038297:-提交慢函数6
14023038297::提交慢函数7
14013038297:9
1403038302:提交慢功能10
140 3038302:提交慢功能11
14 3038302∶提交慢功能13
40 3038302=提交慢功能14
4 3038307:提交慢函数15
1 4038307:提交慢功能16
10 3038307∶提交慢函数17
2 1403038307=提交慢函数18
19
1403038312:退出

再次查看gevent中的源代码,wait_available执行与调用spawn相同的检查,即检查信号量以查看池中是否真的有空间。

使用gevent之前需要修补标准模块。

>>> import gevent.monkey
>>> gevent.monkey.patch_all()
>>> ...
>>> p = gevent.spawn(producer)
>>> p.join()
start fetching... http://www.google.ru
start fetching... http://www.s-str.ru
http://www.google.ru : 200
start fetching... http://www.vk.com
http://www.vk.com : 200
start fetching... http://www.yandex.ru
http://www.yandex.ru : 200
start fetching... http://www.xxx.com
http://www.xxx.com : 200
http://www.s-str.ru : 200

您可以看到,pool.wait_available()的工作是可预测的。

更新

Pool仅对spawn功能起相同的作用(它将等待池中可用的"插槽")。如果你需要提供基于Pool状态的其他功能(日志记录、跟踪、监控),你肯定会使用wait_availablefree_count等功能。如果你只需要spawn新的绿色线程,你可以依赖Pool实现。

相关内容

  • 没有找到相关文章

最新更新