Celery使用Numpy Pickle读取Joblib保存的文件时出现问题



我们遇到了一个非常奇怪的问题,Celery试图读取我们用joblib保存的模型文件。这是一个使用工厂模式的Flask应用程序。以下是总体设置:

fit_model.py:

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder
import joblib

class MyClass(Pipeline):
pass

if __name__ == "__main__":
to_persist = MyClass(steps=[("step", OneHotEncoder())])
joblib.dump(to_persist, "dummy.model")

load_model.py:

import joblib

if __name__ == "__main__":
with open("dummy.model", "rb") as f:
my_object = joblib.load(f)
assert my_object is not None

tasks.py:

import joblib
@celery_app.task()
def dummy_task():
_ = joblib.load("dummy.model")

如果我们先运行lit_model.py文件,然后再运行load_model.py,它运行得非常好。然而,当这作为一个Celery任务运行并且我们试图延迟dummy_task时,我们得到以下结果:

File "/usr/local/lib/python3.7/site-packages/celery/app/trace.py", line 412, in trace_task
R = retval = fun(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/celery/app/trace.py", line 704, in __protected_call__
return self.run(*args, **kwargs)
File "/lit_service/tasks.py", line 32, in dummy_task
_ = joblib.load("dummy.model")
File "/usr/local/lib/python3.7/site-packages/joblib/numpy_pickle.py", line 585, in load
obj = _unpickle(fobj, filename, mmap_mode)
File "/usr/local/lib/python3.7/site-packages/joblib/numpy_pickle.py", line 504, in _unpickle
obj = unpickler.load()
File "/usr/local/lib/python3.7/pickle.py", line 1088, in load
dispatch[key[0]](self)
File "/usr/local/lib/python3.7/pickle.py", line 1376, in load_global
klass = self.find_class(module, name)
File "/usr/local/lib/python3.7/pickle.py", line 1430, in find_class
return getattr(sys.modules[module], name)
AttributeError: module 'celery.bin.celery' has no attribute 'MyClass'

application.py:中按如下方式设置烧瓶芹菜工人

celery_app = Celery(
__name__, broker=settings.CELERY_BROKER_URL, backend=settings.CELERY_BROKER_URL
)
def create_app(**config_overrides):
app = Flask(__name__)
# start celery
celery_app.conf.update(app.config)
celery_app.conf.imports = ["tasks"]
celery_app.conf.task_routes = {"tasks.*": {"queue": "lit_tasks"}}
return app

工人被衍生为Docker容器,如下所示:

celery worker -A celery_worker.celery_app --loglevel=INFO --queues lit_tasks

有什么线索可以找吗?

尝试在自己的文件(如my_class.py(中定义MyClass,并将from my_class import MyClass添加到fit_model.py

最新更新