我正在回答一个关于pandas
interpolation
方法的问题。 OP 只想使用连续 np.nan
s 数为 1 的插值。 interpolate
的limit=1
选项将插入第一个np.nan
并在此停止。 OP希望能够分辨出实际上不止一个np.nan
,甚至不打扰第一个。
我将其归结为按原样执行interpolate
并在事后屏蔽连续np.nan
。
问题是:什么是广义解,它采用一维数组a
和整数x
,并在 x 或多个连续np.nan
的位置生成 False 的布尔掩码
考虑一维数组a
a = np.array([1, np.nan, np.nan, np.nan, 1, np.nan, 1, 1, np.nan, np.nan, 1, 1])
我希望x = 2
面具看起来像这样
# assume 1 for True and 0 for False
# a is [ 1. nan nan nan 1. nan 1. 1. nan nan 1. 1.]
# mask [ 1. 0. 0. 0. 1. 1. 1. 1. 0. 0. 1. 1.]
# ^
# |
# Notice that this is not masked because there is only one np.nan
我希望x = 3
面具看起来像这样
# assume 1 for True and 0 for False
# a is [ 1. nan nan nan 1. nan 1. 1. nan nan 1. 1.]
# mask [ 1. 0. 0. 0. 1. 1. 1. 1. 1. 1. 1. 1.]
# ^ ^ ^
# | | |
# Notice that this is not masked because there is less than 3 np.nan's
我期待着从别人那里学习想法;-(
我真的很喜欢 numba,因为它很容易掌握但很难"麻木"的问题!尽管该包对于大多数库来说可能有点太重了,但它允许编写类似"python"的函数而不会损失太多速度:
import numpy as np
import numba as nb
import math
@nb.njit
def mask_nan_if_consecutive(arr, limit): # I'm not good at function names :(
result = np.ones_like(arr)
cnt = 0
for idx in range(len(arr)):
if math.isnan(arr[idx]):
cnt += 1
# If we just reached the limit we need to backtrack,
# otherwise just mask current.
if cnt == limit:
for subidx in range(idx-limit+1, idx+1):
result[subidx] = 0
elif cnt > limit:
result[idx] = 0
else:
cnt = 0
return result
至少如果你使用纯python,这应该很容易理解,它应该可以工作:
>>> a = np.array([1, np.nan, np.nan, np.nan, 1, np.nan, 1, 1, np.nan, np.nan, 1, 1])
>>> mask_nan_if_consecutive(a, 1)
array([ 1., 0., 0., 0., 1., 0., 1., 1., 0., 0., 1., 1.])
>>> mask_nan_if_consecutive(a, 2)
array([ 1., 0., 0., 0., 1., 1., 1., 1., 0., 0., 1., 1.])
>>> mask_nan_if_consecutive(a, 3)
array([ 1., 0., 0., 0., 1., 1., 1., 1., 1., 1., 1., 1.])
>>> mask_nan_if_consecutive(a, 4)
array([ 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.])
但是@nb.njit
装饰器的真正好处是,这个函数会很快:
a = np.array([1, np.nan, np.nan, np.nan, 1, np.nan, 1, 1, np.nan, np.nan, 1, 1])
i = 2
res1 = mask_nan_if_consecutive(a, i)
res2 = mask_knans(a, i)
np.testing.assert_array_equal(res1, res2)
%timeit mask_nan_if_consecutive(a, i) # 100000 loops, best of 3: 6.03 µs per loop
%timeit mask_knans(a, i) # 1000 loops, best of 3: 302 µs per loop
因此,对于短数组,这大约快 50 倍,即使差异变低,但对于较长的数组来说仍然更快:
a = np.array([1, np.nan, np.nan, np.nan, 1, np.nan, 1, 1, np.nan, np.nan, 1, 1]*100000)
i = 2
%timeit mask_nan_if_consecutive(a, i) # 10 loops, best of 3: 20.9 ms per loop
%timeit mask_knans(a, i) # 10 loops, best of 3: 154 ms per loop
我创建了这个通用解决方案
import pandas as pd
import numpy as np
from numpy.lib.stride_tricks import as_strided as strided
def mask_knans(a, x):
a = np.asarray(a)
k = a.shape[0]
# I will stride n. I want to pad with 1 less False than
# the required number of np.nan's
n = np.append(np.isnan(a), [False] * (x - 1))
# prepare the mask and fill it with True
m = np.empty(k, np.bool8)
m.fill(True)
# stride n into a number of columns equal to
# the required number of np.nan's to mask
# this is essentially a rolling all operation on isnull
# also reshape with `[:, None]` in preparation for broadcasting
# np.where finds the indices where we successfully start
# x consecutive np.nan's
s = n.strides[0]
i = np.where(strided(n, (k + 1 - x, x), (s, s)).all(1))[0][:, None]
# since I prepped with `[:, None]` when I add `np.arange(x)`
# I'm including the subsequent indices where the remaining
# x - 1 np.nan's are
i = i + np.arange(x)
# I use `pd.unique` because it doesn't sort and I don't need to sort
i = pd.unique(i[i < k])
m[i] = False
return m
没有评论
import pandas as pd
import numpy as np
from numpy.lib.stride_tricks import as_strided as strided
def mask_knans(a, x):
a = np.asarray(a)
k = a.shape[0]
n = np.append(np.isnan(a), [False] * (x - 1))
m = np.empty(k, np.bool8)
m.fill(True)
s = n.strides[0]
i = np.where(strided(n, (k + 1 - x, x), (s, s)).all(1))[0][:, None]
i = i + np.arange(x)
i = pd.unique(i[i < k])
m[i] = False
return m
演示
mask_knans(a, 2)
[ True False False False True True True True False False True True]
mask_knans(a, 3)
[ True False False False True True True True True True True True]