如何使一个单一的smtp客户端线程安全的Python?



我甚至不确定我是否问对了问题,所以请原谅我。

我有一个运行python代码的Azure功能应用程序。它接收JSON有效负载,进行一些转换,并发送电子邮件。

一切工作正常,直到函数开始接收请求爆发。这会导致异常:smtplib.SMTPDataError: (432, b'4.3.2 Concurrent connections limit exceeded. Visit https://aka.ms/concurrent_sending for more information. [Hostname=xxxxxxxx]')

我决定为连接客户端创建一个单例来解决这个问题:

class EmailSingleton(type):
_instances = {}
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
cls.__login(*args, **kwargs)
else:
try:
status = cls._instances[cls].noop()[0]
except smtplib.SMTPServerDisconnected:
status = -1
if status != 250:
log.info("SMTP Client disconnected")
cls.__login(*args, **kwargs)
return cls._instances[cls]
def __login(cls, host, port, user, password, timeout):
log.info("Refreshing SMTP client")
cls._instances[cls] = super(EmailSingleton, cls).__call__(host=host, port=port, timeout=timeout)
cls._instances[cls].starttls()
cls._instances[cls].login(user=user, password=password)
class EmailConnectionClientSingleton(smtplib.SMTP, metaclass=EmailSingleton):
pass

,只需调用:

EmailConnectionClientSingleton(
host=host,
port=port,
timeout=timeout,
user=user,
password=password
).send_message(msg)

当请求一个接一个地到达时,这工作得很好,但是当它们突然到达时,我得到了各种有趣的异常:

  • smtplib.SMTPDataError: (503, b'5.5.1 Bad sequence of commands')
  • smtplib.SMTPDataError: (250, b'2.1.5 Recipient OK')
  • smtplib.SMTPDataError: (250, b'2.0.0 OK')
  • smtplib.SMTPRecipientsRefused: {'recipient@domain.com': (503, b'5.5.1 Bad sequence of commands')}

看起来上面所有这些都是由于握手重叠和响应无序造成的。只有大约1/3的邮件成功。

我该如何处理这个问题?我应该使用单例客户机吗(这告诉我应该:https://learn.microsoft.com/en-us/azure/azure-functions/manage-connections?tabs=csharp#static-clients)?我是否尝试使我的单例客户端线程安全?我是否为send_message调用创建一个队列?我是否需要创建一个从不同功能发送电子邮件的worker ?

我尝试在创建客户端时使用_lock = threading.Lock(),但这没有帮助,因为问题似乎是使用客户端而不是创建它。

我最终使用了queue.Queue:

class Singleton(type):
_instance = None
_lock = Lock()
def __call__(cls, *args, **kwargs):
if not cls._instance:
with cls._lock:
# another thread could have created the instance
# before we acquired the lock. So check that the
# instance is still nonexistent.
if not cls._instance:
cls._instance = super().__call__(*args, **kwargs)
return cls._instance

class QueueSingleton(Queue, metaclass=Singleton):
pass

我没有立即发送邮件,而是把它放在队列中:

queue = QueueSingleton()
queue.put(msg)
log.info("Email put in the queue")

然后我创建了一个新的Azure函数,它使用队列发送电子邮件。

queue = QueueSingleton()
while start_time + FUNCTION_MAX_LIFE > datetime.now():
try:
msg = queue.get(timeout=300)
log.info("Got an email to send!")
except Empty:
# If we didn't get anything to send during timeout window
# end the execution and allow TimeTrigger to spawn a new worker
log.info("No emails to send. Quitting.")
break
try:
email_connection.send_message(msg)
queue.task_done()
log.info(f'Email sent: {msg["To"]}')

感觉很垃圾,但看起来很有效。

最新更新