为什么来自 ThreadPool 的线程不异步运行?



我写了以下概念证明代码:

import time
from multiprocessing.pool import ThreadPool
class Maybe:
    def __init__(self):
        self._value = None
        self._exists = False
    def exists(self):
        return self._exists
    def value(self):
        if not self.exists():
            raise ValueError("Maybe doesn't hold any value")
        return self._value
    def set(self, value):
        self._value = value
        self._exists = True
    def unset(self):
        self._value = None
        self._exists = False
class Future(object):
    def __init__(self):
        self._holder = Maybe()
        self._handler = None
    def _set(self, value):
        self._holder.set(value)
        self._invoke()
    def _invoke(self):
        if self._handler and self._holder.exists():
            self._handler(self._holder.value())
    def then(self, handler):
        self._handler = handler
        self._invoke()

def fib(count):
    f,s = 0,1
    for i in xrange(count):
        f,s =s,f+s
    return s
pool = ThreadPool(5)
def test(fun, arg):
    def print_fib(x):
        print("fib => {0}, {1}n".format(arg, len(str(x))))
    tb = time.time()
    future = Future()
    future.then(print_fib)
    future._async_result = pool.apply_async(fun, [arg], callback=future._set)
    ta = time.time()
    print ("Time elapsed : {0}".format(ta - tb))
    return future

x1=test(fib, 2029)
x2=test(fib, 989999)
x3=test(fib, 103)
x4=test(fib, 38484)
x5=test(fib, 20)
time.sleep(3)

,我期望所有的电话都不同步。但是它们似乎并没有异步运行。例如,带有103参数的调用预计将在以989999为参数的呼叫之前完成。我看不到发生这种情况。即使我使用更大的参数进行第二个呼叫,也需要更多时间。

有人可以解释发生了什么吗?为什么它们不同步?

问题是ThreadPool将所有线程设置为守护程序,即,如果主线程退出,则所有线程将被终止。time.sleep(3)不足以使您的过程完成其任务,这就是原因。请注意,您的"经过的时间"日志不正确,因为它测量了发射线程所需的时间,而不是实际任务。

您应该始终.join线程。使用ThreadPool,您可以使用此代码来执行此操作:

...
x1=test(fib, 2029)
x2=test(fib, 989999)
x3=test(fib, 103)
x4=test(fib, 38484)
x5=test(fib, 20)
pool.close()
pool.join()

也将所有日志移至print_fib功能,并且应该按预期工作。

螺纹是共和国的。但是fib运行得太快了;因此,很难说它们是否同时运行。(fib(989999)除外)

尝试关注(我将经过的时间打印零件移至print_fib以正确打印经过的时间。):

def test(fun, arg):
    def print_fib(x):
        print("fib => {0}, {1}n".format(arg, len(str(x))))
        ta = time.time()
        print ("Time elapsed : {0}".format(ta - tb))
    tb = time.time()
    future = Future()
    future.then(print_fib)
    future._async_result = pool.apply_async(fun, [arg], callback=future._set)
    return future
x1 = test(time.sleep, 2)
x2 = test(time.sleep, 5)
x3 = test(time.sleep, 1)
x4 = test(time.sleep, 4)
x5 = test(time.sleep, 3)
#time.sleep(10)
x1._async_result.get()
x2._async_result.get()
x3._async_result.get()
x4._async_result.get()
x5._async_result.get()

输出:

fib => 1, 4
Time elapsed : 1.00200009346
fib => 2, 4
Time elapsed : 2.00099992752
fib => 3, 4
Time elapsed : 3.00200009346
fib => 4, 4
Time elapsed : 4.00200009346
fib => 5, 4
Time elapsed : 5.00200009346

最新更新