我使用了默认的Django管理面板作为后端。我有一个博客模型。我想做的是,每当管理员用户在Django admin上保存博客对象时,我需要向时事通讯订户发送一封电子邮件,通知他们网站上有一个新博客。
我必须发送大量电子邮件,所以我使用django芹菜 此外,我正在使用django信号来触发发送电子邮件功能
但是现在,我不用芹菜送,但是太慢了。
class Subscribers(models.Model):
email = models.EmailField(unique=True)
date_subscribed = models.DateField(auto_now_add=True)
def __str__(self):
return self.email
class Meta:
verbose_name_plural = "Newsletter Subscribers"
# binding signal:
@receiver(post_save,sender=BlogPost)
def send_mails(sender,instance,created,**kwargs):
subscribers = Subscribers.objects.all()
if created:
blog = BlogPost.objects.latest('date_created')
for abc in subscribers:
emailad = abc.email
send_mail('New Blog Post ', f" Checkout our new blog with title {blog.title} ",
EMAIL_HOST_USER, [emailad],
fail_silently=False)
else:
return
使用芹菜文档,我编写了以下文件。
我的芹菜.py
from __future__ import absolute_import
import os
from celery import Celery
from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE','travel_crm.settings')
app = Celery('travel_crm')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
Mu初始化文件:
from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
__all__ = ('celery_app',)
来自文档的任务文件:
def my_first_task(duration):
subject= 'Celery'
message= 'My task done successfully'
receiver= 'receiver_mail@gmail.com'
is_task_completed= False
error=''
try:
sleep(duration)
is_task_completed= True
except Exception as err:
error= str(err)
logger.error(error)
if is_task_completed:
send_mail_to(subject,message,receivers)
else:
send_mail_to(subject,error,receivers)
return('first_task_done')
这个任务不起作用,因为我正在使用Django信号来触发发送电子邮件功能,如何将其应用到任务中。py
我想我理解你的问题。。。我最近也遇到了类似的挑战,其中包括多租户[模式]数据库的复杂性[被证明是Redis的一个问题]。我也试过django芹菜,但它依赖于更老版本的芹菜。此外,我想发送由模型信号postrongave发起的群发邮件。。。使用具有"bcc"one_answers"回复"功能的EmailMultiAlternatives。
所以现在我使用最新的Django,最新的带有Redis的Celery。。。在macOS localhost上运行Poetry虚拟环境&软件包管理器。以下对我有效:
-
芹菜:我花了几个小时在网上搜索教程和建议。。。其中,这一次为我增加了价值Celery w Django。如果你还没有深入研究芹菜,那么这是一个很好的实践。
-
Redis:这将取决于你的操作系统,以及你是在本地还是远程开发。Redis网站将指导您进行设置。我也尝试过RabbitMQ,但[个人]发现它的设置更复杂。
-
代码分数:有4个分数myapp/signals.py,myapp/tasks.py,myproj/celery.py,my proj/settings.py免责声明:我是一个业余程序员。。。更有经验的工程师很可能会改进我的代码。。。我做了一些小测试,一切似乎都正常。
# myapp/signals.py @receiver(post_save, sender=MyModel) def post_save_handler(sender, instance, **kwargs): if instance.some_field == True: recipient_list = list(get_user_model().objects.filter('some filters')) from_email = SomeModel.objects.first().site_email to_email = SomeModel.first().admin_email # call async task Celery task_send_mail.delay(instance.some_field, instance.someother_field, from_email, to_email, recipient_list) # myapp/tasks.py @shared_task(name='task_sendmail') def task_send_mail(instance.some_field, instance.someother_field, from_email, to_email, recipient_list): template_name = 'emails/sometemplate.html' html_message = render_to_string(template_name, {'body': instance.some_field,}) # These variables are added to the email template plain_message = strip_tags(html_message) subject = f'Do Not Reply : {instance.someother_field}' connection = get_connection() connection.open() message = EmailMultiAlternatives( subject, plain_message, from_email, [to_email], bcc=recipient_list, reply_to=[to_email], ) message.attach_alternative(html_message, "text/html") try: message.send() connection.close() except SMTPException as e: print('There was an error sending email: ', e) connection.close() # myproj/celery.py import os from celery import Celery # Set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproj.settings') app = Celery('myproj') # Using a string here means the worker doesn't have to serialize # the configuration object to child processes. # - namespace='CELERY' means all celery-related configuration keys # should have a `CELERY_` prefix. app.config_from_object('django.conf:settings', namespace='CELERY') # Load task modules from all registered Django apps. app.autodiscover_tasks() @app.task(bind=True) def debug_task(self): print(f'Request: {self.request!r}') # myproj/settings.py ... ##Celery Configuration Options CELERY_BROKER_URL = 'redis://localhost:6379//' CELERY_TIMEZONE = "Africa/SomeCity" CELERY_TASK_TRACK_STARTED = True CELERY_TASK_TIME_LIMIT = 30 * 60