Pybind11并发中的并行处理问题::Parallel_for



我有一个python代码,可以对矩阵执行过滤。我已经使用pybind11创建了一个C++接口,它成功地以串行化的方式运行(请参阅下面的代码(。

我正在尝试使它进行并行处理,以期与序列化版本相比减少计算时间。为了做到这一点,我将大小为M×N的数组拆分为大小为M×(N/3)的三个子矩阵,以使用相同的接口并行处理它们。

我使用ppl.h库制作了一个并行的for循环,并在每个循环中调用大小为M×(N/3)的子矩阵上的python函数。

#include <iostream>
#include <ppl.h>
#include "pybind11/embed.h"
#include <pybind11/iostream.h>
#include <pybind11/stl_bind.h>
#include "pybind11/eigen.h"
#include "pybind11/stl.h"
#include "pybind11/numpy.h"
#include "pybind11/functional.h"
#include <Eigen/Dense>
namespace py = pybind11;
class myClass
{
public:
myClass()
{
m_module = py::module::import("myFilterScript");
m_handle = m_module.attr("medianFilter");
};
void medianFilterSerialized(Eigen::Ref<Eigen::MatrixXf> input, int windowSize) 
{
Eigen::MatrixXf output;
output.resizeLike(input);
output = m_handle(input, windowSize).cast<Eigen::MatrixXf>();
};
void medianFilterParallelizedUsingPPL(Eigen::Ref<Eigen::MatrixXf> input, int windowSize) 
{
Eigen::MatrixXf output;
output.resizeLike(input);
/* Acquire GIL before calling Python code */
//py::gil_scoped_acquire acquire;
Concurrency::parallel_for(size_t(0), size_t(3), [&](size_t i)
{
output.block(0, i * input.cols() / 3, input.rows(), input.cols() / 3) = m_handle(input.block(0, i * input.cols() / 3, input.rows(), input.cols() / 3).array(), windowSize).cast<Eigen::MatrixXf>();
});
//py::gil_scoped_release release;
};
private:
py::scoped_interpreter m_guard;
py::module m_module;
py::handle m_handle;
py::object m_object;
};

int main()
{
myClass c;
Eigen::MatrixXf input = Eigen::MatrixXf::Random(240, 120);
c.medianFilterSerialized(input, 3); 
c.medianFilterParallelizedUsingPPL(input, 3);
return 0;
}

myFilterScript.py:

import threading
import numpy as np
import bottleneck as bn # can be installed from https://pypi.org/project/Bottleneck/
def medianFilter(input, windowSize):
return bn.move_median(input, window=windowSize, axis=0)

考虑到使用py::gil_scoped_acquire,我的代码在到达for循环时崩溃:

Access violation reading location // or:
Unhandled exception at 0x00007FF98BB8DB8E (ucrtbase.dll) in Pybind11_Parallelizing.exe: Fatal program exit requested.

有人能帮我理解python模块的加载函数是否可以以多处理或多线程方式并行调用吗?我的代码中缺少什么?请告诉我。提前谢谢。

py::gil_scoped_acquire是一个RAII对象,用于获取范围内的GIL,类似地,py::gil_scoped_release在"反向"RAII中用于释放范围内的GIL。因此,在相关范围内,您只需要前者。

获取GIL的作用域在调用Python的函数上,因此在传递给parallel_for的lambda内部:执行的每个线程都需要保存用于访问任何Python对象或API的GIL,在本例中为m_handle。然而,在lambda中这样做会完全序列化代码,使线程的使用变得毫无意义,因此它会因为错误的原因解决问题。

这将是使用pybind11中没有直接支持的子解释器的情况(https://pybind11.readthedocs.io/en/stable/advanced/embedding.html#sub-interpreter-support(,因此C API将是入场券(https://docs.python.org/3/c-api/init.html#c.Py_NewInterpreter)。重点是操作的数据是非Python的,所有操作原则上都是独立的。

但是,您需要知道Bottleneck是否是线程安全的。粗略地看,它似乎是因为它没有全局/静态数据AFAICT。理论上,还有一些并行化的空间:当调用move_median时,当它进入用于绑定Bottleneck的Cython代码时,你需要保持GIL(它打开变量的框,从而调用Python API(,然后Cython可以在输入Bottleneck的C代码时释放GIL,并在退出时重新获取,然后在RAII范围结束时在lambda中释放。然后,C代码并行运行。

但问题变成了:为什么首先要通过C++的Python绑定来调用C库?这里似乎是一个微不足道的解决方案:跳过Python,直接调用move_medianC函数。

最新更新