仅在连续 NAN 超过 X 时屏蔽



我正在回答一个关于pandas interpolation方法的问题。 OP 只想使用连续 np.nan s 数为 1 的插值。 interpolatelimit=1选项将插入第一个np.nan并在此停止。 OP希望能够分辨出实际上不止一个np.nan,甚至不打扰第一个。

我将其归结为按原样执行interpolate并在事后屏蔽连续np.nan

问题是:什么是广义解,它采用一维数组a和整数x,并在 x 或多个连续np.nan的位置生成 False 的布尔掩码

考虑一维数组a

a = np.array([1, np.nan, np.nan, np.nan, 1, np.nan, 1, 1, np.nan, np.nan, 1, 1])

我希望x = 2面具看起来像这样

# assume 1 for True and 0 for False 
# a is [  1.  nan  nan  nan   1.  nan   1.   1.  nan  nan   1.   1.]
# mask [  1.   0.   0.   0.   1.   1.   1.   1.   0.   0.   1.   1.]
#                                  ^
#                                  |
#   Notice that this is not masked because there is only one np.nan

我希望x = 3面具看起来像这样

# assume 1 for True and 0 for False 
# a is [  1.  nan  nan  nan   1.  nan   1.   1.  nan  nan   1.   1.]
# mask [  1.   0.   0.   0.   1.   1.   1.   1.   1.   1.   1.   1.]
#                                  ^              ^    ^
#                                  |              |    |
# Notice that this is not masked because there is less than 3 np.nan's

我期待着从别人那里学习想法;-(

我真的很喜欢 numba,因为它很容易掌握但很难"麻木"的问题!尽管该包对于大多数库来说可能有点太重了,但它允许编写类似"python"的函数而不会损失太多速度:

import numpy as np
import numba as nb
import math
@nb.njit
def mask_nan_if_consecutive(arr, limit):  # I'm not good at function names :(
    result = np.ones_like(arr)
    cnt = 0
    for idx in range(len(arr)):
        if math.isnan(arr[idx]):
            cnt += 1
            # If we just reached the limit we need to backtrack,
            # otherwise just mask current.
            if cnt == limit:
                for subidx in range(idx-limit+1, idx+1):
                    result[subidx] = 0
            elif cnt > limit:
                result[idx] = 0
        else:
            cnt = 0
    return result

至少如果你使用纯python,这应该很容易理解,它应该可以工作:

>>> a = np.array([1, np.nan, np.nan, np.nan, 1, np.nan, 1, 1, np.nan, np.nan, 1, 1])
>>> mask_nan_if_consecutive(a, 1)
array([ 1.,  0.,  0.,  0.,  1.,  0.,  1.,  1.,  0.,  0.,  1.,  1.])
>>> mask_nan_if_consecutive(a, 2)
array([ 1.,  0.,  0.,  0.,  1.,  1.,  1.,  1.,  0.,  0.,  1.,  1.])
>>> mask_nan_if_consecutive(a, 3)
array([ 1.,  0.,  0.,  0.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.])
>>> mask_nan_if_consecutive(a, 4)
array([ 1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.])

但是@nb.njit装饰器的真正好处是,这个函数会很快:

a = np.array([1, np.nan, np.nan, np.nan, 1, np.nan, 1, 1, np.nan, np.nan, 1, 1])
i = 2
res1 = mask_nan_if_consecutive(a, i)
res2 = mask_knans(a, i)
np.testing.assert_array_equal(res1, res2)
%timeit mask_nan_if_consecutive(a, i)  # 100000 loops, best of 3: 6.03 µs per loop
%timeit mask_knans(a, i)               # 1000 loops, best of 3: 302 µs per loop
因此,对于短数组,

这大约快 50 倍,即使差异变低,但对于较长的数组来说仍然更快:

a = np.array([1, np.nan, np.nan, np.nan, 1, np.nan, 1, 1, np.nan, np.nan, 1, 1]*100000)
i = 2
%timeit mask_nan_if_consecutive(a, i)  # 10 loops, best of 3: 20.9 ms per loop
%timeit mask_knans(a, i)               # 10 loops, best of 3: 154 ms per loop

我创建了这个通用解决方案

import pandas as pd
import numpy as np
from numpy.lib.stride_tricks import as_strided as strided
def mask_knans(a, x):
    a = np.asarray(a)
    k = a.shape[0]
    # I will stride n.  I want to pad with 1 less False than
    # the required number of np.nan's
    n = np.append(np.isnan(a), [False] * (x - 1))
    # prepare the mask and fill it with True
    m = np.empty(k, np.bool8)
    m.fill(True)
    # stride n into a number of columns equal to
    # the required number of np.nan's to mask
    # this is essentially a rolling all operation on isnull
    # also reshape with `[:, None]` in preparation for broadcasting
    # np.where finds the indices where we successfully start
    # x consecutive np.nan's
    s = n.strides[0]
    i = np.where(strided(n, (k + 1 - x, x), (s, s)).all(1))[0][:, None]
    # since I prepped with `[:, None]` when I add `np.arange(x)`
    # I'm including the subsequent indices where the remaining
    # x - 1 np.nan's are
    i = i + np.arange(x)
    # I use `pd.unique` because it doesn't sort and I don't need to sort
    i = pd.unique(i[i < k])
    m[i] = False
    return m

没有评论

import pandas as pd
import numpy as np
from numpy.lib.stride_tricks import as_strided as strided
def mask_knans(a, x):
    a = np.asarray(a)
    k = a.shape[0]
    n = np.append(np.isnan(a), [False] * (x - 1))
    m = np.empty(k, np.bool8)
    m.fill(True)
    s = n.strides[0]
    i = np.where(strided(n, (k + 1 - x, x), (s, s)).all(1))[0][:, None]
    i = i + np.arange(x)
    i = pd.unique(i[i < k])
    m[i] = False
    return m

演示

mask_knans(a, 2)
[ True False False False  True  True  True  True False False  True  True]

mask_knans(a, 3)
[ True False False False  True  True  True  True  True  True  True  True]

最新更新