Python使用卡尔曼滤波器来改进仿真,但结果更差



我对将卡尔曼滤波(KF)应用于以下预测问题时看到的行为有疑问。 我包含一个简单的代码示例。

目标:我想知道卡尔费休是否适合使用现在获得的测量结果(t)来改进未来一天(t+24小时)的预测/模拟结果。 目标是使预测尽可能接近测量值

假设:我们假设测量是完美的(即,如果我们能得到与测量结果完美匹配的预测,我们就很高兴)。

我们有一个测量变量(z,实际风速)和一个模拟变量(x,预测风速)。

模拟风速x是由NWP(数值天气预报)软件使用各种气象数据(对我来说是黑匣子)生成的。 模拟文件每天生成一次,每半小时包含一次数据。

我尝试使用标量卡尔曼滤波器使用我现在获得的测量值和现在的预测数据(t-24 小时前生成)来校正 t+24 小时预测。 作为参考,我使用了:http://www.swarthmore.edu/NatSci/echeeve1/Ref/Kalman/ScalarKalman.html

法典:

#! /usr/bin/python
import numpy as np
import pylab
import os

def main():
    # x = 336 data points of simulated wind speed for 7 days * 24 hour * 2 (every half an hour)
    # Imagine at time t, we will get a x_t fvalue or t+48 or a 24 hours later.
    x = load_x()
    # this is a list that will contain 336 data points of our corrected data
    x_sample_predict_list = []
    # z = 336 data points for 7 days * 24 hour * 2 of actual measured wind speed (every half an hour)
    z = load_z()
    # Here is the setup of the scalar kalman filter
    # reference: http://www.swarthmore.edu/NatSci/echeeve1/Ref/Kalman/ScalarKalman.html
    # state transition matrix (we simply have a scalar)
    # what you need to multiply the last time's state to get the newest state
    # we get the x_t+1 = A * x_t, since we get the x_t+1 directly for simulation
    # we will have a = 1
    a = 1.0
    # observation matrix
    # what you need to multiply to the state, convert it to the same form as incoming measurement 
    # both state and measurements are wind speed, so set h = 1
    h = 1.0
    Q = 16.0    # expected process variance of predicted Wind Speed
    R = 9.0 # expected measurement variance of Wind Speed
    p_j = Q # process covariance is equal to the initial process covariance estimate
    # Kalman gain is equal to k = hp-_j / (hp-_j + R).  With perfect measurement
    # R = 0, k reduces to k=1/h which is 1
    k = 1.0
    # one week data
    # original R2 = 0.183
    # with delay = 6, R2 = 0.295
    # with delay = 12, R2 = 0.147   
    # with delay = 48, R2 = 0.075
    delay = 6 
    # Kalman loop
    for t, x_sample in enumerate(x):
        if t <= delay:          
            # for the first day of the forecast,
            # we don't have forecast data and measurement 
            # from a day before to do correction
            x_sample_predict = x_sample             
        else: # t > 48
            # for a priori estimate we take x_sample as is
            # x_sample = x^-_j = a x^-_j_1 + b u_j
            # Inside the NWP (numerical weather prediction, 
            # the x_sample should be on x_sample_j-1 (assumption)
            x_sample_predict_prior = a * x_sample
            # we use the measurement from t-delay (ie. could be a day ago)
            # and forecast data from t-delay, to produce a leading residual that can be used to
            # correct the forecast.
            residual = z[t-delay] - h * x_sample_predict_list[t-delay]

            p_j_prior = a**2 * p_j + Q
            k = h * p_j_prior / (h**2 * p_j_prior + R)
            # we update our prediction based on the residual
            x_sample_predict = x_sample_predict_prior + k * residual
            p_j = p_j_prior * (1 - h * k)
            #print k
            #print p_j_prior
            #print p_j
            #raw_input()
        x_sample_predict_list.append(x_sample_predict)
    # initial goodness of fit
    R2_val_initial = calculate_regression(x,z)
    R2_string_initial = "R2 initial: {0:10.3f}, ".format(R2_val_initial)    
    print R2_string_initial     # R2_val_initial = 0.183
    # final goodness of fit
    R2_val_final = calculate_regression(x_sample_predict_list,z)
    R2_string_final = "R2 final: {0:10.3f}, ".format(R2_val_final)  
    print R2_string_final       # R2_val_final = 0.117, which is worse

    timesteps = xrange(len(x))      
    pylab.plot(timesteps,x,'r-', timesteps,z,'b:', timesteps,x_sample_predict_list,'g--')
    pylab.xlabel('Time')
    pylab.ylabel('Wind Speed')
    pylab.title('Simulated Wind Speed vs Actual Wind Speed')
    pylab.legend(('predicted','measured','kalman'))
    pylab.show()

def calculate_regression(x, y):         
    R2 = 0  
    A = np.array( [x, np.ones(len(x))] )
    model, resid = np.linalg.lstsq(A.T, y)[:2]  
    R2_val = 1 - resid[0] / (y.size * y.var())          
    return R2_val
def load_x():
    return np.array([2, 3, 3, 5, 4, 4, 4, 5, 5, 6, 5, 7, 7, 7, 8, 8, 8, 9, 9, 10, 10, 10, 11, 11,
     11, 10, 8, 8, 8, 8, 6, 3, 4, 5, 5, 5, 6, 5, 5, 5, 6, 5, 5, 6, 6, 7, 6, 8, 9, 10,
     12, 11, 10, 10, 10, 11, 11, 10, 8, 8, 9, 8, 9, 9, 9, 9, 8, 9, 8, 11, 11, 11, 12,
     12, 13, 13, 13, 13, 13, 13, 13, 14, 13, 13, 12, 13, 13, 12, 12, 13, 13, 12, 12, 
     11, 12, 12, 19, 18, 17, 15, 13, 14, 14, 14, 13, 12, 12, 12, 12, 11, 10, 10, 10, 
     10, 9, 9, 8, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 6, 6, 6, 7, 7, 8, 8, 8, 6, 5, 5, 
     5, 5, 5, 5, 6, 4, 4, 4, 6, 7, 8, 7, 7, 9, 10, 10, 9, 9, 8, 7, 5, 5, 5, 5, 5, 5, 
     5, 5, 6, 5, 5, 5, 4, 4, 6, 6, 7, 7, 7, 7, 6, 6, 5, 5, 4, 2, 2, 2, 1, 1, 1, 2, 3,
     13, 13, 12, 11, 10, 9, 10, 10, 8, 9, 8, 7, 5, 3, 2, 2, 2, 3, 3, 4, 4, 5, 6, 6,
     7, 7, 7, 6, 6, 6, 7, 6, 6, 5, 4, 4, 3, 3, 3, 2, 2, 1, 5, 5, 3, 2, 1, 2, 6, 7, 
     7, 8, 8, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 9, 9, 9, 9, 9, 8, 8, 8, 8, 7, 7, 
     7, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 7, 11, 11, 11, 11, 10, 10, 9, 10, 10, 10, 2, 2,
     2, 3, 1, 1, 3, 4, 5, 8, 9, 9, 9, 9, 8, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 7,
     7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 7, 5, 5, 5, 5, 5, 6, 5])
def load_z():
    return np.array([3, 2, 1, 1, 1, 1, 3, 3, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 3, 2, 1, 1, 2, 2, 2,
     2, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 3, 4, 4, 4, 4, 5, 4, 4, 5, 5, 5, 6, 6,
     6, 7, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 7, 8, 8, 8, 8, 8, 8, 9, 10, 9, 9, 10, 10, 9,
     9, 10, 9, 9, 10, 9, 8, 9, 9, 7, 7, 6, 7, 6, 6, 7, 7, 8, 8, 8, 8, 8, 8, 7, 6, 7,
     8, 8, 7, 8, 9, 9, 9, 9, 10, 9, 9, 9, 8, 8, 10, 9, 10, 10, 9, 9, 9, 10, 9, 8, 7, 
     7, 7, 7, 8, 7, 6, 5, 4, 3, 5, 3, 5, 4, 4, 4, 2, 4, 3, 2, 1, 1, 2, 1, 2, 1, 4, 4,
     4, 4, 4, 3, 3, 3, 1, 1, 1, 1, 2, 3, 3, 2, 3, 3, 3, 2, 2, 5, 4, 2, 5, 4, 1, 1, 1, 
     1, 1, 1, 1, 2, 2, 1, 1, 3, 3, 3, 3, 3, 4, 3, 4, 3, 4, 4, 4, 4, 3, 3, 4, 4, 4, 4,
     4, 4, 5, 5, 5, 4, 3, 3, 3, 3, 3, 3, 3, 3, 1, 2, 2, 3, 3, 1, 2, 1, 1, 2, 4, 3, 1,
     1, 2, 0, 0, 0, 2, 1, 0, 0, 2, 3, 2, 4, 4, 3, 3, 4, 5, 5, 5, 4, 5, 4, 4, 4, 5, 5, 
     4, 3, 3, 4, 4, 4, 3, 3, 3, 4, 4, 4, 5, 5, 5, 4, 5, 5, 5, 5, 6, 5, 5, 8, 9, 8, 9,
     9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 9, 10, 9, 8, 8, 9, 8, 9, 9, 10, 9, 9, 9,
     7, 7, 9, 8, 7, 6, 6, 5, 5, 5, 5, 3, 3, 3, 4, 6, 5, 5, 6, 5])
if __name__ == '__main__': main()  # this avoids executing main on import your_module

-------------------------

观察:

1)如果昨天的预测是过度预测(正偏差),那么今天,我会通过减去偏差来进行修正。 在实践中,如果今天我碰巧预测不足,那么减去正偏差会导致更糟糕的预测。 我实际上观察到数据波动更大,整体拟合度较差。 我的例子有什么问题?

2)大多数卡尔曼滤波

资源表明卡尔曼滤波最小化后验协方差p_j = E{(x_j – x^_j)},并且证明选择K以最小化p_j。 但是有人能解释一下最小化后验协方差实际上如何最小化过程白噪声的影响吗? 在实时情况下,假设实际风速和实测风速为 5 m/s。 预测风速为6米/秒,即。有 W = 1 m/s 的噪音。 残差为 5 – 6 = -1 m/s。 您可以通过从预测中取 1 m/s 来纠正,以返回 5 m/s。 这就是将过程噪声的影响降至最低的方式吗?

3)这是一篇提到应用钦哲基金会进行平滑天气预报的论文。http://hal.archives-ouvertes.fr/docs/00/50/59/93/PDF/Louka_etal_jweia2008.pdf。 有趣的是,在第9等式(7)中,"一旦知道新的观测值y_t,时间t处x的估计值就变为x_t = x_t/t-1 = K(y_t – H_t * x_t/t-1)"。 如果我要参考实际时间来解释它,那么"一旦现在知道新的观测值,估计现在就变得x_t......" 我了解钦哲基金会如何使您的数据实时接近测量。 但是,如果您使用来自 t=now 的测量数据更正 t=now 的预测数据,那么这怎么会成为预测呢?

谢谢!

UPDATE1:

4) 如果我们希望卡尔曼处理数据与测量数据时间序列之间的 R2 从未处理的数据与测量数据改进,我在代码中添加了一个延迟,以调查预测比从当前测量计算的当前偏差晚多少。 在此示例中,如果测量用于改进预测 6 个时间步长(从现在起 3 小时),它仍然有用(R2 从 0.183 变为 0.295)。 但是,如果测量值用于改善 1 天后的预测,那么它会破坏相关性(R2 下降到 0.075)。

我更新了我的测试标量实现,但没有假设完美测量R为1,这就是将卡尔曼增益降低到恒定值1的原因。 现在,我看到了时间序列的改进,减少了RMSE误差。

#! /usr/bin/python
import numpy as np
import pylab
import os
# RMSE improved
def main():
    # x = 336 data points of simulated wind speed for 7 days * 24 hour * 2 (every half an hour)
    # Imagine at time t, we will get a x_t fvalue or t+48 or a 24 hours later.
    x = load_x()
    # this is a list that will contain 336 data points of our corrected data
    x_sample_predict_list = []
    # z = 336 data points for 7 days * 24 hour * 2 of actual measured wind speed (every half an hour)
    z = load_z()
    # Here is the setup of the scalar kalman filter
    # reference: http://www.swarthmore.edu/NatSci/echeeve1/Ref/Kalman/ScalarKalman.html
    # state transition matrix (we simply have a scalar)
    # what you need to multiply the last time's state to get the newest state
    # we get the x_t+1 = A * x_t, since we get the x_t+1 directly for simulation
    # we will have a = 1
    a = 1.0
    # observation matrix
    # what you need to multiply to the state, convert it to the same form as incoming measurement 
    # both state and measurements are wind speed, so set h = 1
    h = 1.0
    Q = 1.0     # expected process noise of predicted Wind Speed    
    R = 1.0     # expected measurement noise of Wind Speed
    p_j = Q # process covariance is equal to the initial process covariance estimate
    # Kalman gain is equal to k = hp-_j / (hp-_j + R).  With perfect measurement
    # R = 0, k reduces to k=1/h which is 1
    k = 1.0
    # one week data
    # original R2 = 0.183
    # with delay = 6, R2 = 0.295
    # with delay = 12, R2 = 0.147   
    # with delay = 48, R2 = 0.075
    delay = 6 
    # Kalman loop
    for t, x_sample in enumerate(x):
        if t <= delay:          
            # for the first day of the forecast,
            # we don't have forecast data and measurement 
            # from a day before to do correction
            x_sample_predict = x_sample             
        else: # t > 48
            # for a priori estimate we take x_sample as is
            # x_sample = x^-_j = a x^-_j_1 + b u_j
            # Inside the NWP (numerical weather prediction, 
            # the x_sample should be on x_sample_j-1 (assumption)
            x_sample_predict_prior = a * x_sample
            # we use the measurement from t-delay (ie. could be a day ago)
            # and forecast data from t-delay, to produce a leading residual that can be used to
            # correct the forecast.
            residual = z[t-delay] - h * x_sample_predict_list[t-delay]
            p_j_prior = a**2 * p_j + Q
            k = h * p_j_prior / (h**2 * p_j_prior + R)
            # we update our prediction based on the residual
            x_sample_predict = x_sample_predict_prior + k * residual
            p_j = p_j_prior * (1 - h * k)
            #print k
            #print p_j_prior
            #print p_j
            #raw_input()
        x_sample_predict_list.append(x_sample_predict)
    # initial goodness of fit
    R2_val_initial = calculate_regression(x,z)
    R2_string_initial = "R2 original: {0:10.3f}, ".format(R2_val_initial)   
    print R2_string_initial     # R2_val_original = 0.183
    original_RMSE = (((x-z)**2).mean())**0.5
    print "original_RMSE"
    print original_RMSE 
    print "n"
    # final goodness of fit
    R2_val_final = calculate_regression(x_sample_predict_list,z)
    R2_string_final = "R2 final: {0:10.3f}, ".format(R2_val_final)  
    print R2_string_final       # R2_val_final = 0.267, which is better
    final_RMSE = (((x_sample_predict-z)**2).mean())**0.5
    print "final_RMSE"
    print final_RMSE    
    print "n"

    timesteps = xrange(len(x))      
    pylab.plot(timesteps,x,'r-', timesteps,z,'b:', timesteps,x_sample_predict_list,'g--')
    pylab.xlabel('Time')
    pylab.ylabel('Wind Speed')
    pylab.title('Simulated Wind Speed vs Actual Wind Speed')
    pylab.legend(('predicted','measured','kalman'))
    pylab.show()

def calculate_regression(x, y):         
    R2 = 0  
    A = np.array( [x, np.ones(len(x))] )
    model, resid = np.linalg.lstsq(A.T, y)[:2]  
    R2_val = 1 - resid[0] / (y.size * y.var())          
    return R2_val
def load_x():
    return np.array([2, 3, 3, 5, 4, 4, 4, 5, 5, 6, 5, 7, 7, 7, 8, 8, 8, 9, 9, 10, 10, 10, 11, 11,
     11, 10, 8, 8, 8, 8, 6, 3, 4, 5, 5, 5, 6, 5, 5, 5, 6, 5, 5, 6, 6, 7, 6, 8, 9, 10,
     12, 11, 10, 10, 10, 11, 11, 10, 8, 8, 9, 8, 9, 9, 9, 9, 8, 9, 8, 11, 11, 11, 12,
     12, 13, 13, 13, 13, 13, 13, 13, 14, 13, 13, 12, 13, 13, 12, 12, 13, 13, 12, 12, 
     11, 12, 12, 19, 18, 17, 15, 13, 14, 14, 14, 13, 12, 12, 12, 12, 11, 10, 10, 10, 
     10, 9, 9, 8, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 6, 6, 6, 7, 7, 8, 8, 8, 6, 5, 5, 
     5, 5, 5, 5, 6, 4, 4, 4, 6, 7, 8, 7, 7, 9, 10, 10, 9, 9, 8, 7, 5, 5, 5, 5, 5, 5, 
     5, 5, 6, 5, 5, 5, 4, 4, 6, 6, 7, 7, 7, 7, 6, 6, 5, 5, 4, 2, 2, 2, 1, 1, 1, 2, 3,
     13, 13, 12, 11, 10, 9, 10, 10, 8, 9, 8, 7, 5, 3, 2, 2, 2, 3, 3, 4, 4, 5, 6, 6,
     7, 7, 7, 6, 6, 6, 7, 6, 6, 5, 4, 4, 3, 3, 3, 2, 2, 1, 5, 5, 3, 2, 1, 2, 6, 7, 
     7, 8, 8, 9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 9, 9, 9, 9, 9, 8, 8, 8, 8, 7, 7, 
     7, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 7, 11, 11, 11, 11, 10, 10, 9, 10, 10, 10, 2, 2,
     2, 3, 1, 1, 3, 4, 5, 8, 9, 9, 9, 9, 8, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 7,
     7, 7, 7, 8, 8, 8, 8, 8, 8, 8, 8, 7, 5, 5, 5, 5, 5, 6, 5])
def load_z():
    return np.array([3, 2, 1, 1, 1, 1, 3, 3, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 3, 2, 1, 1, 2, 2, 2,
     2, 2, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 3, 4, 4, 4, 4, 5, 4, 4, 5, 5, 5, 6, 6,
     6, 7, 8, 8, 8, 8, 8, 8, 8, 8, 8, 8, 7, 8, 8, 8, 8, 8, 8, 9, 10, 9, 9, 10, 10, 9,
     9, 10, 9, 9, 10, 9, 8, 9, 9, 7, 7, 6, 7, 6, 6, 7, 7, 8, 8, 8, 8, 8, 8, 7, 6, 7,
     8, 8, 7, 8, 9, 9, 9, 9, 10, 9, 9, 9, 8, 8, 10, 9, 10, 10, 9, 9, 9, 10, 9, 8, 7, 
     7, 7, 7, 8, 7, 6, 5, 4, 3, 5, 3, 5, 4, 4, 4, 2, 4, 3, 2, 1, 1, 2, 1, 2, 1, 4, 4,
     4, 4, 4, 3, 3, 3, 1, 1, 1, 1, 2, 3, 3, 2, 3, 3, 3, 2, 2, 5, 4, 2, 5, 4, 1, 1, 1, 
     1, 1, 1, 1, 2, 2, 1, 1, 3, 3, 3, 3, 3, 4, 3, 4, 3, 4, 4, 4, 4, 3, 3, 4, 4, 4, 4,
     4, 4, 5, 5, 5, 4, 3, 3, 3, 3, 3, 3, 3, 3, 1, 2, 2, 3, 3, 1, 2, 1, 1, 2, 4, 3, 1,
     1, 2, 0, 0, 0, 2, 1, 0, 0, 2, 3, 2, 4, 4, 3, 3, 4, 5, 5, 5, 4, 5, 4, 4, 4, 5, 5, 
     4, 3, 3, 4, 4, 4, 3, 3, 3, 4, 4, 4, 5, 5, 5, 4, 5, 5, 5, 5, 6, 5, 5, 8, 9, 8, 9,
     9, 9, 9, 9, 10, 10, 10, 10, 10, 10, 10, 9, 10, 9, 8, 8, 9, 8, 9, 9, 10, 9, 9, 9,
     7, 7, 9, 8, 7, 6, 6, 5, 5, 5, 5, 3, 3, 3, 4, 6, 5, 5, 6, 5])
if __name__ == '__main__': main()  # this avoids executing main on import your_module

这条线不遵守标量卡尔曼滤波器:

residual = z[t-delay] - h * x_sample_predict_list[t-delay]

在我看来,你应该这样做:

 residual = z[t -delay] - h * x_sample_predict_prior

最新更新