asyncresult .success()返回false, get()引发属性错误



我第一次尝试在多处理模块中使用Python的ThreadPool来加速一些非常慢的日志解析。

不幸的是,它似乎不能正常工作。我在谷歌上也找不到有类似情况的人。我调用pool.join()来等待线程完成,然后迭代它们以访问它们的返回值。然而,我发现虽然AsyncResult.ready()返回true, asyncresult .success()返回false。当我调用get()时,会抛出一个属性错误。

Traceback (most recent call last):
  File "C:Usersluke.timothyDocumentsAptana Studio 3 WorkspaceMonitormonitor.py", line 175, in <module>
    stamp = threads[i].get()
  File "C:Python27libmultiprocessingpool.py", line 528, in get
    raise self._value
AttributeError: _strptime

我还发现,在join()函数返回时,只有4个线程已经完成。这是出乎意料的,因为从文档中我认为join在返回之前等待所有池项完成。我还发现,如果我在访问返回值之前在每个线程上调用AsyncResult.wait(),那么什么也不会发生。它根本不需要等待。

代码如下:

def parse(all_logs):
    current_time_local = datetime.now()
    print "Parsing."
    stamps = []
    for line in all_logs:
        match = re.match(match_string, line)
        if match:
            for i in range(4):
                if match.group(1 + (i * 3)):
                    wheren = match.group(1 + (i * 3)).rstrip().strip("[").strip("]").split(",")
                    break
            stamp = datetime.strptime(wheren[0], "%Y-%m-%d %H:%M:%S")
            if stamp.day == current_time_local.day or (stamp.day  == current_time_local.day-1 and stamp.hour >= current_time_local.hour):
                try:
                    name, aliases, ipaddrlist = socket.gethostbyaddr(wheren[1].split(":")[1])
                except:
                    continue
                stamps.append(Event(stamp,name,match.groups()))
    print "Returning stamps."
    return stamps
pool = ThreadPool(processes=8)
threads = []
for i in range(8):
    range_begin = i * logs_fraction
    range_end = range_begin + logs_fraction
    print "begin: " + str(range_begin) + " end: " + str(range_end)
    thread_args = []
    thread_args.extend(all_logs[range_begin:range_end])
    threads.append(pool.apply_async(parse, (thread_args, )))
pool.close()
pool.join()
for i in range(8):
    print "Getting thread " + str(i+1)
    print threads[i].ready()
    print threads[i].successful()
    print "Thread Ready."
    stamp = threads[i].get()
    print stamp
    stamps.extend(stamp)

有人能帮忙吗?我以前从未使用过这个模块,就我在谷歌上搜索的结果而言,学习它的材料相当稀少。官方的Python文档只能帮我到目前为止…

根据这个链接,您遇到了datetime库中的线程安全问题。

上周五,我遇到了一个Python Bug,所以这个周末我花了一些时间调查这个bug,并写了这篇文章来解释根本原因。我不是Python专家,而是C程序员。如果你发现如有错误,请指正。

我在这里提取了最小POC:

#!/usr/bin/env python
import thread
import time
def thread_fn():
    for _ in xrange(1, 10):
        for _ in xrange(1, 100):
            time.strptime("2013-06-02", "%Y-%m-%d")
for _ in xrange(10):
    thread.start_new_thread(thread_fn, ())
time.sleep(1)

上层代码有时会抛出异常:AttributeError: _strptime_time,您可以在您的环境中运行它并检查输出。

我检查了Python-2.7.2(Mac默认)和Python-2.7.3(从源代码)。我随机得到这个错误,这意味着有时脚本工作良好!

和变通方法:

你应该意识到这将是一个多线程问题,对吗?这是time_strptime的实现,

static PyObject *
time_strptime(PyObject *self, PyObject *args)
{
    PyObject *strptime_module = PyImport_ImportModuleNoBlock("_strptime");
    PyObject *strptime_result;
    if (!strptime_module)
        return NULL;
    strptime_result = PyObject_CallMethod(strptime_module,
                                            "_strptime_time", "O", args);
    Py_DECREF(strptime_module);
    return strptime_result;
}

每次调用该函数时,它都会尝试加载模块"_strptime"。API PyImport_ImportModuleNoBlock的算法为如果有一个线程正在导入该模块,它将抛出异常而不是阻塞。这避免了重复的模块导入和潜在死锁。

但是在多线程环境中,当一个线程试图导入_strptime,但没有完全导入,其他线程试图直接调用strptime_module._strptime_time。这就是为什么会有bug发生。

如果你很清楚为什么会发生这个bug,你应该已经知道了你内心的变通。其实很简单。您所需要做的就是在开始之前调用一次strptime线程。

因此,您似乎可以通过在创建线程之前直接导入_strptime来解决它。

这是官方的bug报告,里面有更多的信息。

最新更新