GridSearchCV 无法使用管道的内存参数和并发性 (n_jobs > 1)



我使用管道作为GridSearchCV的估计器。这很好。但是,如果我使用内存参数启用缓存并将n_jobs设置为大于1,则cv_results_的得分列是NaN,并且在一秒钟内完成,而不是几分钟。

你不能使用缓存功能与GridSearchCV或我做错了什么?

gsCV = GridSearchCV(
estimator=Pipeline(
#         memory='../Cache/AW1MP_N10_DIN276_Pipeline', # not working if enabled
steps=[
('we', FastTextTransformer()), 
('se', AverageWordVectorTransformer()),
('rf', RandomForestClassifier())
]
),
param_grid=[
{
'we__min_count': [5],
'we__size': [64],
'we__window': [5],
'we__min_n': [3],
'we__max_n': [6],
'rf__n_estimators': [1, 2, 3, 4, 5, 10],# 25, 64, 128], # number of trees in forest
'rf__criterion':['gini'],#'entropy'], # split criterion
'rf__max_features':['auto'], # number of features per tree,
'rf__max_depth':[4, 8, 16]#, 32, 64, 128]
}
], 
cv=CV,
verbose=VERBOSE,
n_jobs=N_JOBS,
return_train_score=True,
scoring=None
)
gsCV.fit(X_train, label_encoder.inverse_transform(Y_train).reshape(-1))

不带内存参数的输出

[Parallel(n_jobs=6)]: Using backend LokyBackend with 6 concurrent workers.
[Parallel(n_jobs=6)]: Done   1 tasks      | elapsed:   28.9s
[Parallel(n_jobs=6)]: Done   6 tasks      | elapsed:   29.4s
[Parallel(n_jobs=6)]: Done  13 tasks      | elapsed:  1.5min
[Parallel(n_jobs=6)]: Done  20 tasks      | elapsed:  2.0min
[Parallel(n_jobs=6)]: Done  29 tasks      | elapsed:  2.5min
[Parallel(n_jobs=6)]: Done  38 tasks      | elapsed:  3.5min
[Parallel(n_jobs=6)]: Done  49 tasks      | elapsed:  4.5min
[Parallel(n_jobs=6)]: Done  60 tasks      | elapsed:  5.1min
[Parallel(n_jobs=6)]: Done  73 tasks      | elapsed:  6.6min
[Parallel(n_jobs=6)]: Done  90 out of  90 | elapsed:  7.6min finished

内存参数设置为路径的输出

[Parallel(n_jobs=6)]: Using backend LokyBackend with 6 concurrent workers.
[Parallel(n_jobs=6)]: Done   1 tasks      | elapsed:    3.3s
[Parallel(n_jobs=6)]: Done   6 tasks      | elapsed:    3.3s
[Parallel(n_jobs=6)]: Done  13 tasks      | elapsed:    3.7s
[Parallel(n_jobs=6)]: Done  20 tasks      | elapsed:    4.0s
[Parallel(n_jobs=6)]: Done  29 tasks      | elapsed:    4.3s
[Parallel(n_jobs=6)]: Done  38 tasks      | elapsed:    4.7s
[Parallel(n_jobs=6)]: Done  49 tasks      | elapsed:    5.0s
[Parallel(n_jobs=6)]: Done  60 tasks      | elapsed:    5.4s
[Parallel(n_jobs=6)]: Done  73 tasks      | elapsed:    5.9s
[Parallel(n_jobs=6)]: Done  90 out of  90 | elapsed:    6.4s finished
C:Usersusernameanaconda3envsSDaClibsite-packagessklearnpipeline.py:296: UserWarning: Persisting input arguments took 1.40s to run.
If this happens often in your code, it can cause performance problems 
(results will be correct in all cases). 
The reason for this is probably some large input arguments for a wrapped
function (e.g. large strings).
THIS IS A JOBLIB ISSUE. If you can, kindly provide the joblib's team with an
example so that they can fix the problem.
**fit_params_steps[name])
C:Usersusernameanaconda3envsSDaClibsite-packagessklearnpipeline.py:296: UserWarning: Persisting input arguments took 5.32s to run.
If this happens often in your code, it can cause performance problems 
(results will be correct in all cases). 
The reason for this is probably some large input arguments for a wrapped
function (e.g. large strings).
THIS IS A JOBLIB ISSUE. If you can, kindly provide the joblib's team with an
example so that they can fix the problem.
**fit_params_steps[name])

error_score='raise'

输出
The above exception was the direct cause of the following exception:
PicklingError                             Traceback (most recent call last)
<ipython-input-247-f1d887547f42> in <module>
19 )
20 
---> 21 gsCV_clf.fit(X_train, label_encoder.inverse_transform(Y_train).reshape(-1)) # use class because of Random Forest Classifier
22 print('hi')
~anaconda3envsSDaClibsite-packagessklearnutilsvalidation.py in inner_f(*args, **kwargs)
70                           FutureWarning)
71         kwargs.update({k: arg for k, arg in zip(sig.parameters, args)})
---> 72         return f(**kwargs)
73     return inner_f
74 
~anaconda3envsSDaClibsite-packagessklearnmodel_selection_search.py in fit(self, X, y, groups, **fit_params)
734                 return results
735 
--> 736             self._run_search(evaluate_candidates)
737 
738         # For multi-metric evaluation, store the best_index_, best_params_ and
~anaconda3envsSDaClibsite-packagessklearnmodel_selection_search.py in _run_search(self, evaluate_candidates)
1186     def _run_search(self, evaluate_candidates):
1187         """Search all candidates in param_grid"""
-> 1188         evaluate_candidates(ParameterGrid(self.param_grid))
1189 
1190 
~anaconda3envsSDaClibsite-packagessklearnmodel_selection_search.py in evaluate_candidates(candidate_params)
713                                for parameters, (train, test)
714                                in product(candidate_params,
--> 715                                           cv.split(X, y, groups)))
716 
717                 if len(out) < 1:
~anaconda3envsSDaClibsite-packagesjoblibparallel.py in __call__(self, iterable)
1052 
1053             with self._backend.retrieval_context():
-> 1054                 self.retrieve()
1055             # Make sure that we get a last message telling us we are done
1056             elapsed_time = time.time() - self._start_time
~anaconda3envsSDaClibsite-packagesjoblibparallel.py in retrieve(self)
931             try:
932                 if getattr(self._backend, 'supports_timeout', False):
--> 933                     self._output.extend(job.get(timeout=self.timeout))
934                 else:
935                     self._output.extend(job.get())
~anaconda3envsSDaClibsite-packagesjoblib_parallel_backends.py in wrap_future_result(future, timeout)
540         AsyncResults.get from multiprocessing."""
541         try:
--> 542             return future.result(timeout=timeout)
543         except CfTimeoutError as e:
544             raise TimeoutError from e
~anaconda3envsSDaClibconcurrentfutures_base.py in result(self, timeout)
433                 raise CancelledError()
434             elif self._state == FINISHED:
--> 435                 return self.__get_result()
436             else:
437                 raise TimeoutError()
~anaconda3envsSDaClibconcurrentfutures_base.py in __get_result(self)
382     def __get_result(self):
383         if self._exception:
--> 384             raise self._exception
385         else:
386             return self._result
PicklingError: ("Can't pickle <class '__main__.CustomTokenizer'>: it's not found as __main__.CustomTokenizer", 'PicklingError while hashing {'transformer': CustomTokenizer(), 'X':       kostenposition_bau_nr_komplett  ...                                               textn12862                            326  ...                           Fenster Holzfenster AQ 1n17556                            326  ...       Scheiben verkratzt Holzfenster AQ 7, 8.1-8.2n11648                            314  ...  Boden am Übergang zwischen Naturstein und Beto...n2344                             300  ...  Farbverschmutzung Decke (Lampe) Farbverschmutz...n13097                            326  ...  Sonnenschutz einstellen linkes Fenster klapper...n...                              ...  ...                                                ...n17213                            327  ...  105 Küche Fuge Arbeitsplatte Rückwand fehlt Ti...n4200                             300  ...  offene Hartverfugung Boden (Dusche) offene Har...n12443                            327  ...   Leichter Versatzder verkleidungsteile am Lich...n14023                            324  ...        Fuge mit Lücken Bad GU AQ 4, 5, 6, 8.1, 8.2n3635                             300  ...  reinigen Glashalteleiste (WC) reinigen Glashal...nn[9731 rows x 3 columns], 'y': array(['326', '326', '314', ..., '327', '324', '300'], dtype='<U3'), 'weight': None, 'message_clsname': 'Pipeline', 'message': None, '**': {}}: PicklingError("Can't pickle <class '__main__.CustomTokenizer'>: it's not found as __main__.CustomTokenizer")')

附加信息
  • scikit-learn 0.23.2

我试图处理在模型定义内调用Tensorflow的引用问题,我认为这是并行执行中GridSearchCV内部调用的实例。您可以在下面找到我的模型定义。Clear_session()应该释放在并行网格搜索执行期间创建的会话中的每个图的内存。config.gpu_options。allow_growth config.gpu_options = True。per_process_gpu_memory_fraction = 0.3提供了一种使用n_jobs=-1 (n_jobs>1)的GridSearchCV的方法。在我的代码中进行了这些修改之后,我可以看到多个训练流,并且没有明显的累积内存使用增加。注意,这个分数应该足够,每个进程都可以执行。如果并发进程较多,则不能避免内存爆炸。一个解决方案是将分数定义为1/(cpu数量)。但是,如果有许多cpu,则存在分数不足以执行进程的风险。另外,当有超过1个GPU可用时,必须观察低内存GPU的总内存。

# Function to create model, required for KerasClassifier
def mlp(
loss='binary_crossentropy', optimizer='adam', 
metrics=['accuracy'], init_mode='uniform',
activation='relu', dropout_rate=0.0, weight_constraint=2,
neurons=12, input_shape = (5,)
):
from tensorflow.keras.models import Sequential # type: ignore
from tensorflow.keras.layers import Dense # type: ignore
from tensorflow.keras.constraints import MaxNorm # type: ignore
from tensorflow.keras.layers import Dropout # type: ignore
# This is to try to cope with memory leak caused by graph creation
# in Tensorflow.
from tensorflow.keras.backend import clear_session # type: ignore
clear_session()
# This is supposed to help with OOM problem when using multiprocess
# and gridsearch. Including the memory limit.
from tensorflow.compat.v1 import ConfigProto # type: ignore
from tensorflow.compat.v1.keras.backend import set_session # type: ignore
from tensorflow.compat.v1 import Session # type: ignore
config = ConfigProto()
config.gpu_options.allow_growth = True
config.gpu_options.per_process_gpu_memory_fraction = 0.3
set_session(Session(config=config)) # type: ignore
# create model
model = Sequential()
model.add(Dense(
units=neurons, input_shape=input_shape, kernel_initializer=init_mode, 
activation=activation, kernel_constraint=MaxNorm(weight_constraint))
)
model.add(Dropout(dropout_rate))
model.add(Dense(1, kernel_initializer=init_mode, activation='sigmoid'))
# Compile model
model.compile(loss=loss, optimizer=optimizer, metrics=metrics)
return model

最新更新