类型错误:issubclass() arg 2 在添加 'ignore_warnings' 包装器时必须是类、类元组或联合



我试图创建一个向后选择套索回归模型,但遇到了一个奇怪的问题。这是我的代码:

import pandas as pd
from sklearn import preprocessing
import statsmodels.api as sm
from sklearn.linear_model import RidgeCV
from sklearn.linear_model import LassoCV
from sklearn.utils._testing import ignore_warnings
from sklearn.exceptions import ConvergenceWarning
from sklearn.metrics import mean_squared_error as mse
# settings
shift_n=1
previous = True
intercept=True
alpha=[0.01, 0.05, 0.1, 0.5, 1]
threshold=0.05
y_mark='Close.'
var_list = ['Ethereum', 'S&P 500', 'Nasdaq', 'DJ Composite', 'Gold', 'Copper', 'Silver',  'Crude Oil WTI', 'Natural Gas']
y_label = y_mark+ 'Ethereum'
ETH_index = 69

# function to get df shift
def df_shift(dataset, y_label, shift_n = 1):
df = dataset.copy()
new_col = y_label + '_p'
df[new_col] = df[y_label].shift(periods= shift_n)
df = df.dropna()
return df

# function to split df into trainset and testset
def split_df(df):
token_index = df.ne(0).idxmax()
if token_index > ETH_index:
ind = token_index
else:
ind = ETH_index
partition = df.index[ind + int((len(df) - ind)*0.8)]
df = df[ind:].copy()
trainset = df[df.index <= partition]
testset = df[df.index > partition]
return trainset, testset

# function to normalize df
def df_preprocessing(df, type='standardize'):
X = df.values
if type == 'standardize':
std_scaler = preprocessing.StandardScaler().fit(X)
x_scaled = std_scaler.transform(X)
res = pd.DataFrame(x_scaled, columns=df.columns, index=df.index)
return res, std_scaler
elif type == 'minmax':
minmax_scaler = preprocessing.MinMaxScaler().fit(X)
x_scaled = minmax_scaler.transform(X)
res = pd.DataFrame(x_scaled, columns=df.columns, index=df.index)
return res, minmax_scaler

# function to get data for modelling
def get_data(df, y_label, preprocess='standardize', intercept=True):
# 01 split X and Y
X = df.loc[:, df.columns != y_label]
Y = df.loc[:, df.columns == y_label]
# 02 preprocess
scaler = None
if preprocess == 'standardize':
X, scaler = df_preprocessing(X, type='standardize')
if preprocess == 'minmax':
X, scaler = df_preprocessing(X, type='minmax')
# 03 add constant term
if intercept == True:
X = sm.add_constant(X)
return X, Y, scaler

def alpha_search(x, y, alpha=[0.01, 0.05, 0.1, 0.5, 1], type='lasso'):
if type == 'ridge':
ridge_cv = RidgeCV(alphas=alpha)
model_cv = ridge_cv.fit(x, y)
return model_cv.alpha_
if type == 'lasso':
lasso_cv = LassoCV(alphas=alpha)
model_cv = lasso_cv.fit(x, y)
return model_cv.alpha_

def liner_model(X, Y, type='lasso', alpha=None):
model = sm.OLS(Y, X)
results_fu = model.fit()
Best_alpha = None
if type == 'ridge':
best_alpha = alpha_search(X, Y, alpha=alpha, type='ridge')
model_ridge = model.fit_regularized(L1_wt=0, alpha=best_alpha, start_params=results_fu.params)
ridge_result = sm.regression.linear_model.OLSResults(model, model_ridge.params, model.normalized_cov_params)
return ridge_result, best_alpha
elif type == 'lasso':
best_alpha = alpha_search(X, Y, alpha=alpha, type='lasso')
model_lasso = model.fit_regularized(L1_wt=1, alpha=best_alpha, start_params=results_fu.params)
lasso_result = sm.regression.linear_model.OLSResults(model, model_lasso.params, model.normalized_cov_params)
return lasso_result, best_alpha
else:
return results_fu, Best_alpha

def backward_selection(df, y_label, type='lasso', alpha=[0.01, 0.05, 0.1, 0.5, 1], threshold=0.05):
X, Y, scaler = get_data(df, y_label=y_label)
# create linear model
model, best_alpha = liner_model(X, Y, type=type, alpha=alpha)
# backward selection model
# .1 get feature coef result
res = list(model.pvalues)
max_p = max(res)
# .2 find the biggest coef and correlated feature name
while max_p > threshold:
ind = res.index(max_p)  # the index of max p value
col = X.columns[ind]  # find the column name
# .3 remove the feature from X
X = X.drop(col, axis=1)
# .4 build a new model
if len(X.columns) == 0:
print('all features have been removed, return the last avaiable model')
return model, X, best_alpha, scaler
model, best_alpha = liner_model(X, Y, type=type, alpha=alpha)
res = list(model.pvalues)
max_p = max(res)
# return result
return model, X, best_alpha, scaler

class backward_selection_model:
def __init__(self, df, y_label, type='lasso', alpha=[0.01, 0.05, 0.1, 0.5, 1], threshold=0.05):
self.original_df = df.copy()
self.df = df_shift(df, y_label)
self.y_label = y_label
self.type = type
self.alpha = alpha
self.threshold = threshold
model, X, best_alpha, scaler = backward_selection(self.df, y_label=self.y_label, type=self.type,
alpha=self.alpha, threshold=self.threshold)
self.model = model
self.X = X
self.best_alpha = best_alpha
self.scaler = scaler
def get_model(self):
return self.model
def get_final_features(self):
return self.X.columns
def get_best_alpha(self):
return self.best_alpha
def get_scaler(self):
return self.scaler
def get_prediction(self):
scaler = self.get_scaler()
target_cols = list(self.get_final_features())
model = self.get_model()
target_df = self.df.copy()
target_X = target_df.loc[:, target_df.columns != self.y_label]
X_ = scaler.transform(target_X)
target_X = pd.DataFrame(X_, columns=target_X.columns, index=target_X.index)
target_X = sm.add_constant(target_X)
target_X = target_X.loc[:, target_cols]
return model.predict(target_X)
def get_mse(self):
prediction = self.get_prediction()
target_df = self.df.copy()
target_y = target_df.loc[:, target_df.columns == self.y_label].values.ravel()
return mse(target_y, prediction)
def get_coef_df(self):
return pd.DataFrame({'coef': self.model.params, 'P-value': self.model.pvalues})

test_path = 'https://raw.githubusercontent.com/Carloszone/Cryptocurrency_Research_project/main/datasets/test.csv'
df = pd.read_csv(test_path, parse_dates = ['Date']).set_index('Date')
test = backward_selection_model(df, y_label)
print('Model MSE: ', test.get_mse())

它有效,我得到了我需要的结果。但我发现在这个过程中有很多"ConvergenceWarning",所以我在backward_selection之前使用了一个包装来解决这个问题,比如:

@ignore_warnings(category=[ConvergenceWarning, UserWarning])
def backward_selection(df, y_label, type='lasso', alpha=[0.01, 0.05, 0.1, 0.5, 1], threshold=0.05):
X, Y, scaler = get_data(df, y_label=y_label)
# create linear model
model, best_alpha = liner_model(X, Y, type=type, alpha=alpha)
# backward selection model
# .1 get feature coef result
res = list(model.pvalues)
max_p = max(res)
# .2 find the biggest coef and correlated feature name
while max_p > threshold:
ind = res.index(max_p)  # the index of max p value
col = X.columns[ind]  # find the column name
# .3 remove the feature from X
X = X.drop(col, axis=1)
# .4 build a new model
if len(X.columns) == 0:
print('all features have been removed, return the last avaiable model')
return model, X, best_alpha, scaler
model, best_alpha = liner_model(X, Y, type=type, alpha=alpha)
res = list(model.pvalues)
max_p = max(res)
# return result
return model, X, best_alpha, scaler

然而,我得到了一个错误:TypeError:issubclass((arg 2必须是一个类、一个类元组或一个联合

Traceback (most recent call last):
File "...cryptoappmodel.py", line 202, in <module>
test = backward_selection_model(df, y_label)
File "...cryptoappmodel.py", line 150, in __init__
model, X, best_alpha, scaler = backward_selection(self.df, y_label=self.y_label, type=self.type,
File "...venvlibsite-packagessklearnutils_testing.py", line 313, in wrapper
return fn(*args, **kwargs)
File "...cryptoappmodel.py", line 116, in backward_selection
model, best_alpha = liner_model(X, Y, type=type, alpha=alpha)
File "C:UserscarloPycharmProjectsETH_transaction_fee_Studycryptoappmodel.py", line 103, in liner_model
best_alpha = alpha_search(X, Y, alpha=alpha, type='lasso')
File "...cryptoappmodel.py", line 89, in alpha_search
model_cv = lasso_cv.fit(x, y)
File "...venvlibsite-packagessklearnlinear_model_coordinate_descent.py", line 1571, in fit
y = column_or_1d(y, warn=True)
File "...venvlibsite-packagessklearnutilsvalidation.py", line 1029, in column_or_1d
warnings.warn(
TypeError: issubclass() arg 2 must be a class, a tuple of classes, or a union

很明显是包装器导致了错误,但我不知道如何修复它

我想我找到了解决方案。这太简单了。只是用((代替[]

@ignore_warnings(category=(ConvergenceWarning, UserWarning))

以下是另一个发现:如果你遇到像";DataConversionWarning:当需要1d数组时,传递了列向量y"如果你的Y是熊猫数据帧的一部分,试试这个:

y.values.reval()

最新更新