我正在尝试创建一个后台作业与RQ:
import django_rq
def _send_password_reset_email_async(email):
print(email)
# Django admin action to send reset password emails
def send_password_reset_email(modeladmin, request, queryset):
for user in queryset:
django_rq.enqueue(_send_password_reset_email_async, user.email)
send_password_reset_email.short_description = 'Send password reset email'
我一直得到这个错误,虽然,似乎我在做一些愚蠢的?
Traceback (most recent call last):
File "/home/lee/Code/cas/venv/lib/python3.4/site-packages/rq/worker.py", line 568, in perform_job
rv = job.perform()
File "/home/lee/Code/cas/venv/lib/python3.4/site-packages/rq/job.py", line
495, in perform
self._result = self.func(*self.args, **self.kwargs)
File "/home/lee/Code/cas/venv/lib/python3.4/site-packages/rq/job.py", line 206, in func
return import_attribute(self.func_name)
File "/home/lee/Code/cas/venv/lib/python3.4/site-packages/rq/utils.py", line 151, in import_attribute
return getattr(module, attribute)
AttributeError: 'module' object has no attribute '_send_password_reset_email_async
"
添加注释,因为我也遇到了这个错误:
当你修改代码时,Django开发服务器会动态地重新加载,但是rqworker进程不会。所以如果你添加/修改一个任务&当新代码(你的web代码)试图调用旧代码(RQ任务)时,你可能会遇到这个错误。
所以,你实际需要做的是将模块导入到某个文件中,然后对它执行延迟。@MrE,所以没有必要在init.py文件
当您从home或其他目录运行rqworker时,会出现此问题。在命令提示符中导航到python文件所在的目录,然后运行rqworker命令。
注意:注意你的redis服务器运行的端口,如果它不是默认的,即6379,那么你将不得不在运行rqworker时提到端口,就像这样。Your_python_file_directory_path $ rqworker——url redis://localhost:port如何将_send_password_reset_email_async
封装在reset_email函数中-
def send_password_reset_email(modeladmin, request, queryset):
def _send_password_reset_email_async(email):
print(email)
for user in queryset:
django_rq.enqueue(_send_password_reset_email_async, user.email)
send_password_reset_email.short_description = 'Send password reset email'
复制https://github.com/ui/django-rq/issues/125中@richardbarran的注释
Rqworker不会在你修改代码时动态重新加载,但是如果你添加/修改了一个任务,Django开发服务器就会这样做。立即调用它,您可能会遇到这个错误,因为新代码(您的web代码)尝试调用旧代码(RQ任务)。
停止/重启rqworker,你就可以了。
我有一个类似的问题,并确保在应用程序的__init__.py
中导入模块解决了这个问题。
尝试在另一个文件中编写函数,将其作为模块导入
# in my_utility.py
def send_password_reset_email_async(email):
print(email)
然后import django_rq
from my_utility import send_password_reset_email_async
#def _send_password_reset_email_async(email):
# print(email)
# Django admin action to send reset password emails
def send_password_reset_email(modeladmin, request, queryset):
for user in queryset:
django_rq.enqueue(send_password_reset_email_async, user.email)
send_password_reset_email.short_description = 'Send password reset email'
在我的例子中,我没有把任务放在tasks.py文件中…