在Connexion中验证请求正文之前验证签名消息



目前,我有一个正在工作的API,它使用Connexion并接收OpenAPI规范:

connexion_app.add_api(
"openapi.yaml",
options={"swagger_ui": False},
validate_responses=True,
strict_validation=True,  # Changing this also didn't help
)

响应按以下顺序进行验证:

  1. 检查API-Key是否有效
  2. 验证请求正文是否包含所有必要的参数
  3. 验证消息签名
  4. 处理请求并发送响应

API-Key的验证通过OpenAPI规范进行:

securitySchemes:
apiKeyAuth:
type: apiKey
in: header
name: API-Key
x-apikeyInfoFunc: server.security.check_api_key
security:
- apiKeyAuth: []

验证也通过OpenAPI规范完成。

签名在端点中得到验证:

if not verify_signature(kwargs):
abort(401, "Signature could not be verified")

其中verify_signature基本上是这样的:

def verify_signature(request) -> bool:
"""Calculate the signature using the header and data."""
signature = re.findall(r'"([A-Za-z0-9+/=]+)"', connexion.request.headers.get("Message-Signature", ""))
created = re.findall(r"created=(d+)", connexion.request.headers.get("Message-Signature", ""))
if len(signature) == 0:
abort(401, "No valid Signature found.")
if len(created) == 0:
abort(401, "No valid created timestamp found.")
signature = signature[0]
created = int(created[0])
method, path, host, api_key, content_type = _get_attributes_from_request()
message = create_signature_message(request["body"], created, method, path, host, api_key, content_type)
recreated_signature = _encode_message(message)
return recreated_signature == str(signature)

出于安全考虑,我想交换2。和3.:

  1. 检查API-Key是否有效
  2. 验证消息签名
  3. 验证请求正文是否包含所有必要的参数
  4. 处理请求并发送响应

问题是,在我到达执行Python代码(如verify_signature(的端点之前,Connexion会验证主体。

我尝试将以下内容添加到我的OpenAPI.yaml:

signatureAuth:
type: http
scheme: basic
x-basicInfoFunc: server.security.verify_signature
security:
- apiKeyAuth: []
signatureAuth: []

但我认为这是错误的方法,因为我认为这只是一种简单的验证方法,我得到了以下错误消息:No authorization token provided

现在我的问题是:

是否有一种方法可以执行一个函数,该函数接收在Connexion验证主体之前执行的整个请求

是的,您可以使用Connexionbefore_request注释,以便在验证正文之前对新请求运行函数。下面是一个记录标题和内容的示例:

import connexion
import logging
from flask import request
logger = logging.getLogger(__name__)
conn_app = connexion.FlaskApp(__name__)
@conn_app.app.before_request
def before_request():
for h in request.headers:
logger.debug('header %s', h)
logger.debug('data %s', request.get_data())

最新更新