我写了以下概念证明代码:
import time
from multiprocessing.pool import ThreadPool
class Maybe:
def __init__(self):
self._value = None
self._exists = False
def exists(self):
return self._exists
def value(self):
if not self.exists():
raise ValueError("Maybe doesn't hold any value")
return self._value
def set(self, value):
self._value = value
self._exists = True
def unset(self):
self._value = None
self._exists = False
class Future(object):
def __init__(self):
self._holder = Maybe()
self._handler = None
def _set(self, value):
self._holder.set(value)
self._invoke()
def _invoke(self):
if self._handler and self._holder.exists():
self._handler(self._holder.value())
def then(self, handler):
self._handler = handler
self._invoke()
def fib(count):
f,s = 0,1
for i in xrange(count):
f,s =s,f+s
return s
pool = ThreadPool(5)
def test(fun, arg):
def print_fib(x):
print("fib => {0}, {1}n".format(arg, len(str(x))))
tb = time.time()
future = Future()
future.then(print_fib)
future._async_result = pool.apply_async(fun, [arg], callback=future._set)
ta = time.time()
print ("Time elapsed : {0}".format(ta - tb))
return future
x1=test(fib, 2029)
x2=test(fib, 989999)
x3=test(fib, 103)
x4=test(fib, 38484)
x5=test(fib, 20)
time.sleep(3)
,我期望所有的电话都不同步。但是它们似乎并没有异步运行。例如,带有103
参数的调用预计将在以989999
为参数的呼叫之前完成。我看不到发生这种情况。即使我使用更大的参数进行第二个呼叫,也需要更多时间。
有人可以解释发生了什么吗?为什么它们不同步?
问题是ThreadPool
将所有线程设置为守护程序,即,如果主线程退出,则所有线程将被终止。time.sleep(3)
不足以使您的过程完成其任务,这就是原因。请注意,您的"经过的时间"日志不正确,因为它测量了发射线程所需的时间,而不是实际任务。
您应该始终.join
线程。使用ThreadPool
,您可以使用此代码来执行此操作:
...
x1=test(fib, 2029)
x2=test(fib, 989999)
x3=test(fib, 103)
x4=test(fib, 38484)
x5=test(fib, 20)
pool.close()
pool.join()
也将所有日志移至print_fib
功能,并且应该按预期工作。
螺纹是共和国的。但是fib
运行得太快了;因此,很难说它们是否同时运行。(fib(989999)
除外)
尝试关注(我将经过的时间打印零件移至print_fib
以正确打印经过的时间。):
def test(fun, arg):
def print_fib(x):
print("fib => {0}, {1}n".format(arg, len(str(x))))
ta = time.time()
print ("Time elapsed : {0}".format(ta - tb))
tb = time.time()
future = Future()
future.then(print_fib)
future._async_result = pool.apply_async(fun, [arg], callback=future._set)
return future
x1 = test(time.sleep, 2)
x2 = test(time.sleep, 5)
x3 = test(time.sleep, 1)
x4 = test(time.sleep, 4)
x5 = test(time.sleep, 3)
#time.sleep(10)
x1._async_result.get()
x2._async_result.get()
x3._async_result.get()
x4._async_result.get()
x5._async_result.get()
输出:
fib => 1, 4
Time elapsed : 1.00200009346
fib => 2, 4
Time elapsed : 2.00099992752
fib => 3, 4
Time elapsed : 3.00200009346
fib => 4, 4
Time elapsed : 4.00200009346
fib => 5, 4
Time elapsed : 5.00200009346