是否存在将Kerberos身份验证集成到FastApi中的现有方法?
我可以在这个SO问题中看到创建自定义身份验证方法的细节。FastApi文档描述了如何实现Basic验证。我是否认为我需要修改它以返回带有Www-Authenticate的401:协商不使用令牌并验证授权头中提供的令牌?在Python中是否有这样的示例,或者是否已经有现成的Kerberos库用于FastApi ?
是的,通过SPNEGO的Kerberos是通过WWW-Authenticate中的Negotiate
机制实现的。您可以使用Pythonspnego
、gssapi
或sspi
模块来验证Kerberos令牌(第一个是跨平台的,另外两个分别用于Linux和Windows)。在大多数情况下,它相当简单:
- 对于python-pyspnego,创建一个
spnego.server()
来验证令牌 - 对于python-gssapi,创建一个
gssapi.SecurityContext(creds=..., usage="accept")
。 - python-pykerberos也可以工作,但是使用起来就不那么愉快了。
票证将根据系统默认的keytab(或$KRB5_KTNAME)进行验证;gssapi
模块允许通过代码指定自定义服务器凭据,否则您需要通过os.environ指定keytab。
mech, _, in_token = req.headers["Authorization"].partition(b" ")
if mech == b"Negotiate":
# contexts are not reusable -- each HTTP request is a brand new auth
context = gssapi.SecurityContext(usage="accept")
in_token = base64.b64decode(in_token)
# step() throws exception on auth failure
out_token = context.step(in_token)
out_token = base64.b64encode(out_token)
out_hdr = b"WWW-Authenticate: " + mech + b" " + out_token
我没有FastApi的具体示例,所以我将假装它是一个通用的ASGI应用程序-这里是我在各种基于WSGI的项目中使用的WSGI中间件的大多数工作ASGI端口,只是作为如何使用gssapi.Credentials()
的示例:
# (c) Mantas Mikulėnas <grawity@gmail.com>; released under the MIT license
import asyncio
import base64
import gssapi
class GSSAPIWrapper():
def __init__(self, app, *,
service_name=None,
keytab_path=None,
credentials=None):
if service_name and credentials:
raise ValueError("'service_name' and 'credentials' are mutually exclusive")
if keytab_path and credentials:
raise ValueError("'keytab_path' and 'credentials' are mutually exclusive")
if not credentials:
if not isinstance(service_name, gssapi.Name):
service_name = gssapi.Name(service_name or "HTTP@",
gssapi.NameType.hostbased_service)
if keytab_path:
credentials = gssapi.Credentials(name=service_name,
usage="accept",
store={"keytab": keytab_path})
else:
credentials = gssapi.Credentials(name=service_name,
usage="accept")
self.app = app
self.creds = credentials
async def __call__(self, scope, receive, send):
if scope["type"] != "http":
return await self.app(scope, receive, send)
in_hdr = dict(scope["headers"]).get(b"authorization", b"")
if in_hdr.startswith(b"Negotiate "):
ctx = gssapi.SecurityContext(creds=self.creds, usage="accept")
in_token = base64.b64decode(in_hdr[10:])
out_token = ctx.step(in_token)
#out_token = await asyncio.to_thread(ctx.step, in_token)
if ctx.complete:
scope.setdefault("extensions", {})
scope["extensions"]["gss.initiator"] = ctx.initiator_name
scope["extensions"]["REMOTE_USER"] = str(ctx.initiator_name)
async def custom_send(event):
if out_token and event["type"] == "http.response.start":
out_hdr = base64.b64encode(out_token)
event.setdefault("headers", [])
event["headers"].append((b"WWW-Authenticate",
b"Negotiate " + out_hdr))
await send(event)
return await self.app(scope, receive, custom_send)
# No input header, or the context didn't complete in 1 step
await send({"type": "http.response.start",
"status": 401,
"headers": [(b"WWW-Authenticate", b"Negotiate")]})
await send({"type": "http.response.body",
"body": b""})