无法grok python多处理



我需要为数据库的每个元素运行一个函数。

当我尝试以下操作时:

from multiprocessing import Pool
from pymongo import Connection
def foo():
...

connection1 = Connection('127.0.0.1', 27017)
db1 = connection1.data
my_pool = Pool(6)
my_pool.map(foo, db1.index.find())

我得到以下错误:

作业1,'python-myscript.py'被信号SIGKILL(强制退出)终止

我认为,这是由于db1.index.find()在试图返回数百万个数据库元素时吃掉了所有可用的ram。。。

我应该如何修改我的代码使其工作?

这里有一些日志:

dmesg | tail -500 | grep memory
[177886.768927] Out of memory: Kill process 3063 (python) score 683 or sacrifice child
[177891.001379]  [<ffffffff8110e51a>] out_of_memory+0xfa/0x250
[177891.021362] Out of memory: Kill process 3063 (python) score 684 or sacrifice child
[177891.025399]  [<ffffffff8110e51a>] out_of_memory+0xfa/0x250

实际功能如下:

def create_barrel(item):
    connection = Connection('127.0.0.1', 27017)
    db = connection.data
    print db.index.count()
    barrel = []
    fls = []
    if 'name' in item.keys():
        barrel.append(WhitespaceTokenizer().tokenize(item['name']))
        name = item['name']
    elif 'name.utf-8' in item.keys():
        barrel.append(WhitespaceTokenizer().tokenize(item['name.utf-8']))
        name = item['name.utf-8']
    else:
        print item.keys()
    if 'files' in item.keys():
        for file in item['files']:
            if 'path' in file.keys():
                barrel.append(WhitespaceTokenizer().tokenize(" ".join(file['path'])))
                fls.append(("\".join(file['path']),file['length']))
            elif 'path.utf-8'  in file.keys():
                barrel.append(WhitespaceTokenizer().tokenize(" ".join(file['path.utf-8'])))
                fls.append(("\".join(file['path.utf-8']),file['length']))
            else:
                print file
                barrel.append(WhitespaceTokenizer().tokenize(file))
    if len(fls) < 1:
        fls.append((name,item['length']))
    barrel = sum(barrel,[])
    for s in barrel:
        vs = re.findall("d[d|.]*d", s) #versions i.e. numbes such as 4.2.7500 
    b0 = []
    for s in barrel:
        b0.append(re.split("[" + string.punctuation + "]", s))
    b1 = filter(lambda x: x not in string.punctuation, sum(b0,[]))
    flag = True
    while flag:
        bb = []
        flag = False
        for bt in b1:
            if bt[0] in string.punctuation:
                bb.append(bt[1:])
                flag = True
            elif bt[-1] in string.punctuation:
                bb.append(bt[:-1])
                flag = True
            else:
                bb.append(bt)
        b1 = bb
    b2 = b1 + barrel + vs
    b3 = list(set(b2))
    b4 = map(lambda x: x.lower(), b3)
    b_final = {}
    b_final['_id'] = item['_id']
    b_final['tags'] = b4
    b_final['name'] = name
    b_final['files'] = fls
    print db.barrels.insert(b_final)

我注意到一件有趣的事情。然后我按下ctrl+c来停止进程,我得到以下信息:

python index2barrel.py 
Traceback (most recent call last):
  File "index2barrel.py", line 83, in <module>
    my_pool.map(create_barrel, db1.index.find, 6)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 227, in map
    return self.map_async(func, iterable, chunksize).get()
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 280, in map_async
    iterable = list(iterable)
TypeError: 'instancemethod' object is not iterable

我的意思是,为什么多处理试图将某些内容转换为列表?这不是问题的根源吗?

从堆栈跟踪:

brk(0x231ccf000)                        = 0x231ccf000
futex(0x1abb150, FUTEX_WAKE_PRIVATE, 1) = 1
sendto(3, "+2602633553563257data.index"..., 43, 0, NULL, 0) = 43
recvfrom(3, "Some text from my database."..., 491663, 0, NULL, NULL) = 491663
... [manymany times]
brk(0x2320d5000)                        = 0x2320d5000
.... manymany times

上面的示例在strace输出中反复出现,出于某种原因,strace-o日志文件python myscript.py不会停止。它只是吃掉所有可用的ram并写入日志文件。

更新。用imap代替地图解决了我的问题。

由于find()操作正在返回光标映射函数,并且您说这在执行时没有问题for item in db1.index.find(): create_barrel(item)看起来CCD_ 4功能正常

你能试着限制光标中返回的结果数量吗?看看这是否有帮助?我认为语法应该是:

db1.index.find().limit(100)

如果你可以尝试一下,看看它是否有帮助,它可能有助于找出问题的原因。

编辑1:我认为你使用map函数的方式是错误的——我认为你应该在mongo-python驱动程序中使用map_reduce——这样map函数将由mongod进程执行。

map()函数将块中的项提供给给定的函数。默认情况下,这个块大小是这样计算的(链接到源):

chunksize, extra = divmod(len(iterable), len(self._pool) * 4)

在您的情况下,这可能会导致块大小过大,并导致进程内存不足。尝试手动设置区块大小,如下所示:

my_pool.map(foo, db1.index.find(), 100)

EDIT:您还应该考虑重用数据库连接,并在使用后关闭它们。现在,您为每个项目创建了新的数据库连接,并且不会对它们调用close()

编辑2:还要检查while循环是否进入无限循环(可以解释症状)。

EDIT3:根据您添加的回溯,map函数试图将光标转换为列表,从而一次提取所有项目。之所以会发生这种情况,是因为它想要查找集合中有多少项。这是pool.py:中map()代码的一部分

if not hasattr(iterable, '__len__'):
    iterable = list(iterable)

你可以尝试这样做来避免转换到列表:

cursor = db1.index.find()
cursor.__len__ = cursor.count()
my_pool.map(foo, cursor)

相关内容

  • 没有找到相关文章

最新更新