如何通过'xarray.apply_ufunc'在 "x" 和 "y" 之间执行 xr 的 11 天移动窗口的线性回归来替换'for'循环。数据?



按1天步幅估计每个11天移动窗口的"x"one_answers"y"之间的线性斜率。

from sklearn import linear_model
import numpy as np
import xarray as xr
import pandas as pd
# Create a dataset as an example
site = np.linspace(0,3,num=4,dtype='int8')
time= pd.date_range('2018-01-01','2020-12-31',freq='d')
x = np.random.randint(0,500,size=[len(site), len(time)])
y = np.random.randint(0,500,size=[len(site), len(time)])
_ds = xr.Dataset(data_vars=dict(
x=(["site", "time"], x),
y=(["site", "time"], y)),
coords=dict(
site=site,
time=time))
# define the linear regression model
def ransac_fit(xi,yi, **ransac_kwargs):
Xi = xi.reshape(-1, 1)
yi = yi
ransac = linear_model.RANSACRegressor(**ransac_kwargs)
ransac.fit(Xi, yi)
slope= ransac.estimator_.coef_
b = ransac.estimator_.intercept_
return slope, b

目前,我可以使用"for"循环来表示"site"one_answers"time",但这非常笨拙。。。

def clc_slope(_ds, window_size=11):
slps    =[]
bs      =[]
mean_xs =[]
mean_ys=[]

var_x = _ds['x']
var_y = _ds['y']

# for loop for each year and date
for year in np.unique(_ds.time.dt.year.values):
for doy in np.unique(_ds.sel(time=str(year)).time.dt.dayofyear.values):

# define inorg and endrg
inorg = doy-np.int(window_size/2+1)
enorg = doy+np.int(window_size/2)

# calculate mean values of x and y for each moving window
mean_x = np.nanmean(var_x.sel(time=str(year))[inorg:enorg].values)
mean_y  = np.nanmean(var_y.sel(time=str(year))[inorg:enorg].values)

mean_xs = np.append(mean_xs, mean_x)
mean_ys  = np.append(mean_ys, mean_x)
# start to estimate slope and intercept
_x = var_x.sel(time=str(year))[inorg:enorg].values
_y = var_y.sel(time=str(year))[inorg:enorg].values

# if there is too many nans then assign slope and intcept to be nan
if (np.isfinite(_x) & np.isfinite(_y)).sum()<((np.int(window_size/2)+2)*1):
_slp=_b= np.nan
else:
try:
_slp, _b = ransac_fit(_x,_y, min_samples=0.6, stop_n_inliers=np.int(window_size/2)*1)
except:
_slp=_b = np.nan
slps = np.append(slps,_slp)
bs   = np.append(bs, _b)
outs = [slps, bs, mean_xs, mean_ys]
return outs
# run the slope and intercept estimation for each site and concat afterwards
_dss = []
for st in ds.site.values:
_ds = ds.sel(site=st)
outs = clc_slope(_ds)
_ds['slp']    = ('time',outs[0])
_ds['b']      = ('time',outs[1])
_ds['mean_xs']= ('time',outs[2])
_ds['mean_ys']= ('time',outs[3])
_dss.append(_ds)
dss = xr.concat(_dss, dim='site')

我知道xarray.apply_ufunc可以极大地节省时间,但我不知道这种棘手的方法。如果你能给我一个提示,我将不胜感激!非常感谢。

rolled = ds.rolling(time=11, center=True).construct("window")
slps, bs = xr.apply_ufunc(
ransac_fit,
rolled['x'],
rolled['y'],
input_core_dims=[['window'],['window']],
output_core_dims=[[],[]],
vectorize=True,
dask='parallelized',
)

这是通过使用rolling函数创建第三维度(窗口),然后使用apply_ufunc广播站点时间的维度来实现的。

这不使用apply_ufunc,但它确实大大加快了实现速度。

xarray的滚动模块有一个非常强大的功能,叫做construct。它所做的是取一个滚动窗口,而不是缩小它,而是将它扩展到一个新的维度。xarray在不复制数据的情况下做到了这一点——它只为滚动维度上的每个元素提供一个切片到数组中,每个切片都是窗口的长度,并与前一个切片偏移一个:

In [3]: rolled = _ds.rolling(time=11).construct("window")
In [4]: rolled
Out[4]:
<xarray.Dataset>
Dimensions:  (site: 50, time: 1096, window: 11)
Coordinates:
* site     (site) int8 0 0 0 0 0 0 0 0 0 0 0 0 0 ... 2 2 2 2 2 2 2 2 2 2 2 2 3
* time     (time) datetime64[ns] 2018-01-01 2018-01-02 ... 2020-12-31
Dimensions without coordinates: window
Data variables:
x        (site, time, window) float64 nan nan nan nan ... 350.0 9.0 303.0
y        (site, time, window) float64 nan nan nan nan ... 246.0 351.0 310.0

您可以使用它沿每个窗口执行任意操作。它对复杂窗口操作的原型制作也很有帮助,因为您可以准确地看到每个切片中发生了什么。

接下来,对于每个切片,我们可以堆叠sitewindow维度,以在一个向量中获得您想要的每个回归的所有观测值:

In [5]: stacked = rolled.stack(obs=("window", "site"))
In [6]: stacked
Out[6]:
<xarray.Dataset>
Dimensions:  (time: 1096, obs: 550)
Coordinates:
* time     (time) datetime64[ns] 2018-01-01 2018-01-02 ... 2020-12-31
* obs      (obs) MultiIndex
- window   (obs) int64 0 0 0 0 0 0 0 0 0 0 0 ... 10 10 10 10 10 10 10 10 10 10
- site     (obs) int64 0 0 0 0 0 0 0 0 0 0 0 0 0 2 2 2 2 2 2 2 2 2 2 2 2 3
Data variables:
x        (time, obs) float64 nan nan nan nan nan ... 81.0 136.0 194.0 303.0
y        (time, obs) float64 nan nan nan nan nan ... 470.0 300.0 329.0 310.0

现在我们有了这个,我们可以包装您的回归函数,使其接受并返回数据集。我将添加一个新的维度coeff,因为slope是一个向量(您也可以用slope.item()获取标量slope,并跳过额外的dim):

def ransac_fit_xr(ds, **ransac_kwargs):
xi, yi = ds.x.values.ravel(), ds.y.values.ravel()
mask = (~np.isnan(xi))
# you could apply your masking rule here if you'd like:
# if mask.sum() < len(mask) / 2:
#     return xr.Dataset({"slope": (("coeff", ), [np.nan]), "b": np.nan})
xi, yi = xi[mask], yi[mask]
slope, b = ransac_fit(xi, yi, **ransac_kwargs)
return xr.Dataset({"slope": (("coeff", ), slope), "b": b})

现在,我们可以循环使用time的元素来构建我们的回归结果:

In [22]: results = []
...: for i in stacked.time.values:
...:     results.append(ransac_fit_xr(stacked.sel(time=i, drop=True)))
...: res_ds = xr.concat(results, dim=stacked.time)
In [23]: res_ds
Out[23]:
<xarray.Dataset>
Dimensions:  (time: 1096, coeff: 1)
Coordinates:
* time     (time) datetime64[ns] 2018-01-01 2018-01-02 ... 2020-12-31
Dimensions without coordinates: coeff
Data variables:
slope    (time, coeff) float64 -0.1954 0.3 -0.0878 ... -0.1385 0.05444
b        (time) float64 413.6 303.5 366.0 271.4 ... 342.1 256.4 362.8 303.

这相当快。我对sklearn的估计器的一个持续挑战是,没有好的方法来运行张量回归,在这种情况下,你想传递一组参数,沿着维度的某个子集运行回归,然后接收一组输出。xarray的polyfit确实可以做到这一点,但您目前只能运行多项式回归。因此,如果你有更复杂的东西,比如RANSACRegressor,你必须接受外循环的性能冲击。如果您愿意,可以通过使用map_blocks并行化来加快速度。

最新更新