在我的项目中,我使用pybind11将c++代码绑定到Python。最近,我不得不处理非常大的数据集(70GB以上),并且需要将来自一个std::deque
的数据拆分到多个std::deque
之间。由于我的数据集如此之大,我希望拆分不会有太多的内存开销。因此,我采取了"一放一推"的策略,一般来说,这应该能确保我的要求得到满足。
这些都是理论上的。在实践中,我的过程被扼杀了。所以我挣扎了两天,最终想出了下面这个最小的例子来演示这个问题。
一般来说,最小的例子在deque
(~11GB)中创建一堆数据,将其返回给Python,然后再次调用C++
来移动元素。就是这么简单。移动部分在执行器中完成。
有趣的是,如果我不使用executor,内存使用就会像预期的那样,而且当ulimit对虚拟内存施加限制时,程序会真正尊重这些限制,不会崩溃。
test.py
from test import _test
import asyncio
import concurrent
async def test_main(loop, executor):
numbers = _test.generate()
# moved_numbers = _test.move(numbers) # This works!
moved_numbers = await loop.run_in_executor(executor, _test.move, numbers) # This doesn't!
if __name__ == '__main__':
loop = asyncio.get_event_loop()
executor = concurrent.futures.ThreadPoolExecutor(1)
task = loop.create_task(test_main(loop, executor))
loop.run_until_complete(task)
executor.shutdown()
loop.close()
test.cpp
#include <deque>
#include <iostream>
#include <pybind11/pybind11.h>
#include <pybind11/stl.h>
namespace py = pybind11;
PYBIND11_MAKE_OPAQUE(std::deque<uint64_t>);
PYBIND11_DECLARE_HOLDER_TYPE(T, std::shared_ptr<T>);
template<class T>
void py_bind_opaque_deque(py::module& m, const char* type_name) {
py::class_<std::deque<T>, std::shared_ptr<std::deque<T>>>(m, type_name)
.def(py::init<>())
.def(py::init<size_t, T>());
}
PYBIND11_PLUGIN(_test) {
namespace py = pybind11;
pybind11::module m("_test");
py_bind_opaque_deque<uint64_t>(m, "NumbersDequeue");
// Generate ~11Gb of data.
m.def("generate", []() {
std::deque<uint64_t> numbers;
for (uint64_t i = 0; i < 1500 * 1000000; ++i) {
numbers.push_back(i);
}
return numbers;
});
// Move data from one dequeue to another.
m.def("move", [](std::deque<uint64_t>& numbers) {
std::deque<uint64_t> numbers_moved;
while (!numbers.empty()) {
numbers_moved.push_back(std::move(numbers.back()));
numbers.pop_back();
}
std::cout << "Done!n";
return numbers_moved;
});
return m.ptr();
}
测试/__init__ . py
import warnings
warnings.simplefilter("default")
:
g++ -std=c++14 -O2 -march=native -fPIC -Iextern/pybind11 `python3.5-config --includes` `python3.5-config --ldflags` `python3.5-config --libs` -shared -o test/_test.so test.cpp
观察:
- 当移动部分不是由executor完成时,所以我们只调用
moved_numbers = _test.move(numbers)
,一切都如预期的那样工作,htop显示的内存使用情况保持在11Gb
左右,太棒了! - 当移动部分在执行器中完成时,程序占用双倍的内存并崩溃。
当引入虚拟内存限制(~15Gb)时,一切正常,这可能是最有趣的部分。
ulimit -Sv 15000000 && python3.5 test.py
>>Done!
.当我们增加限制程序崩溃(150Gb>我的RAM)。
ulimit -Sv 150000000 && python3.5 test.py
>>[1] 2573 killed python3.5 test.py
使用deque方法
shrink_to_fit
没有帮助(也不应该)
Ubuntu 14.04
gcc version 5.4.1 20160904 (Ubuntu 5.4.1-2ubuntu1~14.04)
Python 3.5.2
pybind11 latest release - v1.8.1
注意
请注意,这个例子只是为了演示这个问题。使用asyncio
和pybind
是出现问题的必要条件。
这个问题原来是由于在一个线程中创建数据,然后在另一个线程中释放引起的。这是因为glibc中的malloc竞技场(参见此参考)。可以这样很好地演示:
executor1 = concurrent.futures.ThreadPoolExecutor(1)
executor2 = concurrent.futures.ThreadPoolExecutor(1)
numbers = await loop.run_in_executor(executor1, _test.generate)
moved_numbers = await loop.run_in_executor(executor2, _test.move, numbers)
占用_test.generate
和
executor = concurrent.futures.ThreadPoolExecutor(1)
numbers = await loop.run_in_executor(executor, _test.generate)
moved_numbers = await loop.run_in_executor(executor, _test.move, numbers)
伤口不。
这个问题可以通过重写代码来解决,这样就不会将元素从一个容器移动到另一个容器(我的情况),或者通过设置环境变量export MALLOC_ARENA_MAX=1
来限制malloc竞技场的数量为1。然而,这可能会涉及到一些性能问题(有一个很好的理由使用多个竞技场)。