Django DRF发布多数据库连接



我是DRF的新手,我正在从事一个连接多个数据库的项目。我可以使用get从任何指定的数据库中检索数据,但当使用post时,我会收到一个与数据库连接和表名有关的错误。我想我需要在post方法中参考数据库,但我正在着手研究这个问题。有人知道如何引用self.create的特定数据库吗?

class AdminView(
mixins.ListModelMixin, 
mixins.CreateModelMixin,
generics.GenericAPIView):
serializer_class = AdminMeterLakeSerializer
queryset = AdminMeterLake.objects.using('testdb').all()
def get(self, request, *args, **kwargs):
return self.list(request, *args, **kwargs)
def post(self, request, *args, **kwargs):
return self.create(request.data, *args, **kwargs)

我建议创建上下文管理器。这将允许在未来有效地控制数据库连接:

  • 将DB路由器添加到config/settings.py
...
DATABASE_ROUTERS = [
'config.routers.DynamicDatabaseRouter',
]
...
  • config/routers.py中创建DynamicDatabaseRouter
from contextvars import ContextVar
active_db = ContextVar("DB to use", default=None)
def get_active_db():
# return default connection if not set
db = active_db.get(None)
return db if db else 'default'
def set_active_db(connection_name):
return active_db.set(connection_name)
class DynamicDatabaseRouter:
@staticmethod
def _get_db(*args, **kwargs):
db = get_active_db()
return db
db_for_read = _get_db
db_for_write = _get_db
  • 将上下文管理器添加到config/context.py中:
from contextlib import ContextDecorator
from config.routers import set_active_db

class db(ContextDecorator):
def __init__(self, connection_name):
self.connection_name = connection_name
def __enter__(self):
set_active_db(connection_name)
return self
def __exit__(self, *exc):
set_active_db(None)
return False

你的课程应该更新为:

from config.context import db
class AdminView(
mixins.ListModelMixin, 
mixins.CreateModelMixin,
generics.GenericAPIView):
serializer_class = AdminMeterLakeSerializer
queryset = AdminMeterLake.objects.using('testdb').all()
def get(self, request, *args, **kwargs):
return self.list(request, *args, **kwargs)
def post(self, request, *args, **kwargs):
with db('testdb'):
return self.create(request.data, *args, **kwargs)

流程为:

  1. 运行with db('testdb')时,会调用__enter__方法
  2. active_db上下文变量值将更新为testdb
  3. 数据库路由器读取active_db值并使用testdb连接
  4. 操作完成后,将调用__exit__,上下文变量值将恢复为None*
  • 如果active_db值为None,路由器将返回default连接

OR 1

你可以简单地使用django动态数据库路由器

此信息也可能有用

或2

您可以为数据库Model:设置自定义Manager

from django.db import models
class AdminMeterLake(models.Model):
...
col_name_1 = models.CharField(max_length=50)
col_name_2 = models.CharField(max_length=50)
...
objects = models.Manager().using('testdb')

在这种情况下,默认情况下将使用testdb,您可以设置queryset = AdminMeterLake.objects.all()而不是queryset = AdminMeterLake.objects.using('testdb').all()

  • models.Manager().using('testdb')方法我没有测试过,它只是理论解(但我想它应该有效…(

通过使用绝对老板rzlvmp的解决方案一,我能够获得一个能够按需路由数据库的工作解决方案。

项目设置:

'''
DATABASE_ROUTERS = [
'config.routers.DynamicDatabaseRouter',
]
'''

config/routers.py,我用manage.py 将其放入根目录

from contextvars import ContextVar
active_db = ContextVar("DB to use", default=None)
def get_active_db():
# return default connection if not set
db = active_db.get(None)
return db if db else 'default'
def set_active_db(connection_name):
return active_db.set(connection_name)
class DynamicDatabaseRouter:
@staticmethod
def _get_db(*args, **kwargs):
db = get_active_db()
return db
db_for_read = _get_db
db_for_write = _get_db

config/context.py

from contextlib import ContextDecorator
from config.routers import set_active_db

class db(ContextDecorator):
def __init__(self, connection_name):
self.connection_name = connection_name
def __enter__(self):

set_active_db(self.connection_name)
return self
def __exit__(self, *exc):
set_active_db(None)
return False

对于我的serializers.py

from .models import AdminMeterLake
from rest_framework import request, serializers
class AdminMeterLakeSerializer(serializers.ModelSerializer):
class Meta:
model = AdminMeterLake
fields = ['testa', 'testb', 'testc']

请注意,我没有包括所有进口

from config.context import db
class AdminView(
mixins.ListModelMixin, 
mixins.CreateModelMixin,
generics.GenericAPIView):
serializer_class = AdminMeterLakeSerializer
queryset = AdminMeterLake.objects.using('testdb').all()
def get(self, request, *args, **kwargs):
return self.list(request, *args, **kwargs)
def post(self, request, *args, **kwargs):
with db('testdb'):
serializer = AdminMeterLakeSerializer(data=request.data)
if serializer.is_valid():
serializer.save()
return Response(serializer.data)
return Response(serializer.errors)

最新更新