Uvicorn异步工作者仍在同步工作



问题简而言之

我已经将我的项目从Django 2.2迁移到Django 3.2,现在我想开始使用异步视图的可能性。我创建了一个异步视图,设置了asgi配置,并使用一个Uvicorn工作程序运行gunicorn。当这个服务器同时有10个用户时,他们会得到同步服务。我需要配置什么才能为10个并发用户提供异步视图?

问题详细

这就是我目前在本地环境中所做的:

  • 我正在使用Django 3.2.10和Python 3.9
  • 我已经通过pip安装了gunicornuvicorn
  • 我创建了一个包含以下内容的asgi.py文件
import os
from django.core.asgi import get_asgi_application
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'MyService.settings.local')
application = get_asgi_application()
  • 我创建了一个具有以下实现的视图,并在urlpatterns中连接它:
import asyncio
import json
from django.http import HttpResponse

async def async_sleep(request):
await asyncio.sleep(1)
return HttpResponse(json.dumps({'mode': 'async', 'time': 1).encode())
  • 我用一个Uvicorn工作人员在本地运行gunicorn服务器:
gunicorn MyService.asgi:application -k uvicorn.workers.UvicornWorker
[2022-01-26 14:37:14 +0100] [8732] [INFO] Starting gunicorn 20.1.0
[2022-01-26 14:37:14 +0100] [8732] [INFO] Listening at: http://127.0.0.1:8000 (8732)
[2022-01-26 14:37:14 +0100] [8732] [INFO] Using worker: uvicorn.workers.UvicornWorker
[2022-01-26 14:37:14 +0100] [8733] [INFO] Booting worker with pid: 8733
[2022-01-26 13:37:15 +0000] [8733] [INFO] Started server process [8733]
[2022-01-26 13:37:15 +0000] [8733] [INFO] Waiting for application startup.
[2022-01-26 13:37:15 +0000] [8733] [INFO] ASGI 'lifespan' protocol appears unsupported.
[2022-01-26 13:37:15 +0000] [8733] [INFO] Application startup complete.
  • 我曾经从本地客户端访问过API。1秒后,我得到了200 OK,正如预期的那样
  • 我设置了一个蝗虫服务器来生成并发用户。当我让它与1个并发用户进行请求时,每1秒就完成一个API调用
  • 当我让它向10个并发用户发出请求时,每1秒就完成一个API调用。所有其他请求都在等待

最后一件事不是我所期望的。我希望工作人员在异步睡眠时已经接收到下一个请求。我是否缺少某些配置?

我也试过用达芙妮代替乌维科恩,但结果是一样的。

蝗虫

这就是我如何建立我的蝗虫。

  • 启动一个新的虚拟机
  • pip install locust
  • 创建具有以下内容的locustfile.py
from locust import HttpUser, task
class SleepUser(HttpUser):
@task
def async_sleep(self):
self.client.get('/api/async_sleep/')
  • 从shell运行蝗虫可执行文件
  • 访问http://0.0.0.0:8089在浏览器中
  • 将工作者数量设置为10,产卵率设置为1,主机设置为http://127.0.0.1:8000

中间件

这些是我的中间件设置

MIDDLEWARE = [
'django_prometheus.middleware.PrometheusBeforeMiddleware',
'corsheaders.middleware.CorsMiddleware',
'django.middleware.gzip.GZipMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
'django.middleware.security.SecurityMiddleware',
'shared.common.middleware.ApiLoggerMiddleware',
'django_prometheus.middleware.PrometheusAfterMiddleware',
]

共享的ApiLoggerMiddleware来自我们自己的代码,我将首先研究这个。这就是它的实现

import logging
import os
from typing import List
from django.http import HttpRequest, HttpResponse
from django.utils import timezone
from shared.common.authentication_service import BaseAuthenticationService

class ApiLoggerMiddleware:
TOO_BIG_FOR_LOG_BYTES = 2 * 1024
def __init__(self, get_response):
# The get_response callable is provided by Django, it is a function
# that takes a request and returns a response. Plainly put, once we're
# done with the incoming request, we need to pass it along to get the
# response which we need to ultimately return.
self._get_response = get_response
self.logger = logging.getLogger('api')
self.pid = os.getpid()
self.request_time = None
self.response_time = None
def __call__(self, request: HttpRequest) -> HttpResponse:
common_data = self.on_request(request)
response = self._get_response(request)
self.on_response(response, common_data)
return response
def truncate_body(self, request: HttpRequest) -> str:
return f"{request.body[:self.TOO_BIG_FOR_LOG_BYTES]}"
def on_request(self, request: HttpRequest) -> List[str]:
self.request_time = timezone.now()
remote_address = self.get_remote_address(request)
user_agent = request.headers.get('User-Agent') or ''
customer_uuid = self.get_customer_from_request_auth(request)
method = request.method
uri = request.get_raw_uri()
common = [
remote_address,
user_agent,
customer_uuid,
method,
uri
]
in_line = [
"IN",
str(self.pid),
str(self.request_time),
] + common + [
self.truncate_body(request)
]
self.logger.info(', '.join(in_line))
return common
def on_response(self, response: HttpResponse, common: List[str]) -> None:
self.response_time = timezone.now()
out_line = [
"OUT",
str(self.pid),
str(self.response_time)
] + common + [
str(self.response_time - self.request_time),
str(response.status_code),
]
self.logger.info(", ".join(out_line))
@classmethod
def get_customer_from_request_auth(cls, request: HttpRequest) -> str:
token = request.headers.get('Authorization')
if not token:
return 'no token'
try:
payload = BaseAuthenticationService.validate_access_token(token)
return payload.get('amsOrganizationId', '')
except Exception:
return 'unknown'
@classmethod
def get_remote_address(cls, request: HttpRequest) -> str:
if 'X-Forwarded-For' in request.headers:
# in case the request comes in through a proxy, the remote address
# will be just the last proxy that passed it along, that's why we
# have to get the remote from X-Forwarded-For
# https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/X-Forwarded-For
addresses = request.headers['X-Forwarded-For'].split(',')
client = addresses[0]
return client
else:
return request.META.get('REMOTE_ADDR', '')

来源

我使用过的来源:

Django 3.0中ASGI及其性能指南
  • 如何使用Django与Uvicorn
  • 您的ApiLoggerMiddleware是一个同步中间件。

    发件人https://docs.djangoproject.com/en/4.0/topics/async/#async-视图,强调我的:

    只有在站点中没有加载同步中间件的情况下,您才能获得完全异步请求堆栈的好处如果有一个同步中间件,那么Django必须在每个请求中使用一个线程来安全地模拟它的同步环境。

    中间件可以同时支持同步和异步上下文。Django的一些中间件是这样构建的,但不是全部要查看Django必须适应的中间件,可以打开django.request记录器的调试日志,并查找有关"Synchronous middleware…adapted"的日志消息

    (日志消息当前显示"异步中间件…已调整",错误报告在#33495。(

    通过将以下内容添加到LOGGING设置中,打开django.request记录器的调试日志:

    'django.request': {
    'handlers': ['console'],
    'level': 'DEBUG',
    },
    

    解决方案

    使ApiLoggerMiddleware异步:

    1. 继承django.utils.deprecation.MiddlewareMixin
      • 调用__init__中的super().__init__(get_response)
      • 去除CCD_ 15;MiddlewareMixin.__call__使您的中间件异步
    2. 将CCD_ 17重构为CCD_。
      • 返回None而不是common
      • common附加到requestrequest.common = common
        请记住更新对request.common的引用
      • request_time附加到request而不是self以使其(和中间件(线程安全
        请记住更新对request.request_time的引用
    3. 重构CCD_ 29到CCD_。
      • 返回response
      • 不要将response_time附加到self;将它保留为一个变量,因为它没有在其他函数中使用

    结果:

    class ApiLoggerMiddleware(MiddlewareMixin):
    TOO_BIG_FOR_LOG_BYTES = 2 * 1024
    def __init__(self, get_response):
    # The get_response callable is provided by Django, it is a function
    # that takes a request and returns a response. Plainly put, once we're
    # done with the incoming request, we need to pass it along to get the
    # response which we need to ultimately return.
    super().__init__(get_response)  # +
    self._get_response = get_response
    self.logger = logging.getLogger('api')
    self.pid = os.getpid()
    # self.request_time = None   # -
    # self.response_time = None  # -
    # def __call__(self, request: HttpRequest) -> HttpResponse:  # -
    #     common_data = self.on_request(request)                 # -
    #     response = self._get_response(request)                 # -
    #     self.on_response(response, common_data)                # -
    #     return response                                        # -
    def truncate_body(self, request: HttpRequest) -> str:
    return f"{request.body[:self.TOO_BIG_FOR_LOG_BYTES]}"
    # def on_request(self, request: HttpRequest) -> List[str]:  # -
    def process_request(self, request: HttpRequest) -> None:    # +
    # self.request_time = timezone.now()   # -
    request.request_time = timezone.now()  # +
    remote_address = self.get_remote_address(request)
    user_agent = request.headers.get('User-Agent') or ''
    customer_uuid = self.get_customer_from_request_auth(request)
    method = request.method
    uri = request.get_raw_uri()
    common = [
    remote_address,
    user_agent,
    customer_uuid,
    method,
    uri
    ]
    in_line = [
    "IN",
    str(self.pid),
    # str(self.request_time),   # -
    str(request.request_time),  # +
    ] + common + [
    self.truncate_body(request)
    ]
    self.logger.info(', '.join(in_line))
    # return common          # -
    request.common = common  # +
    return None              # +
    # def on_response(self, response: HttpResponse, common: List[str]) -> None:                # -
    def process_response(self, request: HttpRequest, response: HttpResponse) -> HttpResponse:  # +
    # self.response_time = timezone.now()  # -
    response_time = timezone.now()         # +
    out_line = [
    "OUT",
    str(self.pid),
    # str(self.response_time)  # -
    str(response_time)         # +
    # ] + common + [                    # -
    ] + getattr(request, 'common', []) + [  # +
    # str(self.response_time - self.request_time),             # -
    str(response_time - getattr(request, 'request_time', 0)),  # +
    str(response.status_code),
    ]
    self.logger.info(", ".join(out_line))
    return response  # +
    @classmethod
    def get_customer_from_request_auth(cls, request: HttpRequest) -> str:
    token = request.headers.get('Authorization')
    if not token:
    return 'no token'
    try:
    payload = BaseAuthenticationService.validate_access_token(token)
    return payload.get('amsOrganizationId', '')
    except Exception:
    return 'unknown'
    @classmethod
    def get_remote_address(cls, request: HttpRequest) -> str:
    if 'X-Forwarded-For' in request.headers:
    # in case the request comes in through a proxy, the remote address
    # will be just the last proxy that passed it along, that's why we
    # have to get the remote from X-Forwarded-For
    # https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/X-Forwarded-For
    addresses = request.headers['X-Forwarded-For'].split(',')
    client = addresses[0]
    return client
    else:
    return request.META.get('REMOTE_ADDR', '')
    

    最新更新