加速Scipy自定义连续随机变量



我已经创建了一个scipy.stats.rv_continuous子类,并且似乎正在做我想要的事情,但它非常慢。代码和测试结果下面。

我正在使用的分布函数(损坏的幂律)易于集成和计算属性,因此是否还有另一种内部方法,我应该以分析值进行子类以使其更快?该文档尚不清楚rvs实际上是如何绘制的,但大概它正在发现cdf的倒数。

class Broken_Power_Law(sp.stats.rv_continuous):
    def __init__(self, slopes, breaks, name='Broken_Power_Law'):
        """
        Here `slopes` are the power-law indices for each section, and
        `breaks` are the edges of each section such that `slopes[0]` applies
        between `breaks[0]` and `breaks[1]`, etc.
        """
        super().__init__(a=np.min(breaks), b=np.max(breaks), name=name)
        nums = len(slopes)
        # Calculate the proper normalization of the PDF semi-analytically
        pdf_norms = np.array([np.power(breaks[ii], slopes[ii-1] - slopes[ii]) if ii > 0 else 1.0
                              for ii in range(nums)])
        pdf_norms = np.cumprod(pdf_norms)
        # The additive offsets to calculate CDF values
        cdf_offsets = np.array([(an/(alp+1))*(np.power(breaks[ii+1], alp+1) -
                                              np.power(breaks[ii], alp+1))
                                for ii, (alp, an) in enumerate(zip(slopes, pdf_norms))])
        off_sum = cdf_offsets.sum()
        cdf_offsets = np.cumsum(cdf_offsets)
        pdf_norms /= off_sum
        cdf_offsets /= off_sum
        self.breaks = breaks
        self.slopes = slopes
        self.pdf_norms = pdf_norms
        self.cdf_offsets = cdf_offsets
        self.num_segments = nums
        return
    def _pdf(self, xx):
        mm = np.atleast_1d(xx)
        yy = np.zeros_like(mm)
        # For each power-law, calculate the distribution in that region 
        for ii in range(self.num_segments):
            idx = (self.breaks[ii] < mm) & (mm <= self.breaks[ii+1])
            aa = self.slopes[ii]
            an = self.pdf_norms[ii]
            yy[idx] = an * np.power(mm[idx], aa)
        return yy
    def _cdf(self, xx):
        mm = np.atleast_1d(xx)
        yy = np.zeros_like(mm)
        off = 0.0
        # For each power-law, calculate the cumulative dist in that region
        for ii in range(self.num_segments):
            # incorporate the cumulative offset from previous segments
            off = self.cdf_offsets[ii-1] if ii > 0 else 0.0
            idx = (self.breaks[ii] < mm) & (mm <= self.breaks[ii+1])
            aa = self.slopes[ii]
            an = self.pdf_norms[ii]
            ap1 = aa + 1
            yy[idx] = (an/(ap1)) * (np.power(mm[idx], ap1) - np.power(self.breaks[ii], ap1)) + off
        return yy

测试时:

> test1 = sp.stats.norm()
> %timeit rvs = test1.rvs(size=100)
46.3 µs ± 1.87 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)
> test2 = Broken_Power_Law([-1.3, -2.2, -2.7], [0.08, 0.5, 1.0, 150.0])
> %timeit rvs = test2.rvs(size=100)
200 ms ± 8.57 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)

即。5000x慢!!!

一种解决方案是覆盖_rvs方法本身,并使用分析公式使用逆变换采样来绘制样品:

def _rvs(self, size=None):
    """Invert the CDF (semi)-analytically to draw samples from distribution.
    """
    if size is None:
        size = self._size
    rands = np.random.uniform(size=size)
    samps = np.zeros_like(rands)
    # Go over each segment region, find the region each random-number belongs in based on
    #    the offset values
    for ii in range(self.num_segments):
        lo = self.cdf_offsets[ii]
        hi = self.cdf_offsets[ii+1]
        idx = (lo <= rands) & (rands < hi)
        mlo = self.breaks[ii]
        aa = self.slopes[ii]
        an = self.pdf_norms[ii]
        ap1 = aa + 1
        vals = (ap1/an) * (rands[idx] - lo) + np.power(mlo, ap1)
        samps[idx] = np.power(vals, 1.0/ap1)
    return samps

速度几乎与内置采样相同,

> %timeit rvs = test3.rvs(size=100)
56.8 µs ± 1 µs per loop (mean ± std. dev. of 7 runs, 10000 loops each)

最新更新