我们如何在FastAPI应用程序中共享内存中的类



我需要创建一个以列表为属性的调控器类。每次请求出现在fastapi中时,都会从列表中弹出一个令牌元素,并将其发送到后台进程。一旦后台进程结束,它应该将此令牌放回同一列表中。

调速器.py

from typing import List
from datetime import datetime
from collections import deque
import threading
from decouple import AutoConfig
from fastapi import HTTPException
from http import HTTPStatus

config = AutoConfig(search_path='.env')

class JobToken:
"""
Job Token component class with 4 properties
"""
identifier = str()
checked_out_at = None
returned_at = None
deltatime_seconds = float()

class Governor():
"""
Singleton class with shared state
Args:
Has private variable call __lss which is a LIFO and can't be accessed from outside
Returns:
[type]: [description]
"""
instance = None

#used deque instead of list as deque provides an O(1) time complexity for append and pop operations as compared to list which provides O(n) time complexity. 
__lss = deque()
counter = 1
for i in range(0,int(config('MAX_BACKGROUND_TASKS'))):
identifier = 'JT_'+str(counter)
token = JobToken()
token.identifier = identifier
__lss.append(token)
counter+=1

def __new__(cls, *args, **kwargs) -> instance:
# THis method is used to create only one instance to make it singleton
if cls.instance is None:
cls.instance = super().__new__(Governor)
return cls.instance
return cls.instance

# Borg Method to share same state between different instances
__shared_state = {}
def __init__(self) -> None:
self.__dict__ = self.__shared_state

self._lock = threading.Lock()

def get_token(self) -> JobToken:
"""
This component first checks the deque length and cleans the token to add checkout time.

Returns:
JobToken: class object popped out from deck
"""
with self._lock:
#This implementation is thread-safe. There is no way for multiple threads to manipulate the token value at the same time, so there's no way that an token value is lost.
print('---------------------', self.__lss)
if len(self.__lss)>0:
token = self.__lss.pop()
token.checked_out_at = datetime.now()
token.returned_at = None
token.deltaTime_seconds = None
return token
else:
raise HTTPException(status_code=HTTPStatus.TOO_MANY_REQUESTS)

#TODO: raise hhtp exctption here

def return_token(self, token:JobToken)->None:
"""
This function puts back token to the deque in order for it to be borrowed again by another process.
The cleaning doesn't take place here as we need history associated with tokens as well.

Args:
token ([JobToken]): JobToken object to be returned by methods
"""
print('------ ',len(self.__lss))
with self._lock:
#This implementation is thread-safe. There is no way for multiple threads to manipulate the token value at the same time, so there's no way that an token value is lost.
if token:
token.returned_at = datetime.now()
token.deltatime_seconds =  (token.returned_at - token.checked_out_at).total_seconds()
self.__lss.append(token)
else:
raise TypeError("Return Token not found")

def inspect_token_list(self)->List[JobToken]:
"""
This func returns a clone so mutations have no effect on the real bank of tokens
Returns:
List[Jobtoken]: []
"""
return self.__lss.copy()

main.py

from http.client import HTTPResponse
from lib2to3.pgen2 import token
from typing import Dict
from fastapi import FastAPI, BackgroundTasks, HTTPException, Request, status
from concurrent.futures.process import ProcessPoolExecutor
from http import HTTPStatus
import asyncio
import time
from fastapi.responses import JSONResponse
from governer import Governor
from processors import process_data
from config import XCODE, STAGE_Y, STAGE_Z
#simplest form of tokens
app = FastAPI()
async def run_in_process(fn, *args):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(app.state.executor, fn, *args)  
@app.post("/likelySuccess/{guid}", status_code=HTTPStatus.ACCEPTED)
async def executeModel_B(guid:str, background_tasks: BackgroundTasks):
"""[summary]
API route for likelySuccess
Args:
guid (str): path parameter to fetch corresponding data
background_tasks (BackgroundTasks): Task object by fastapi
Returns:
[type]: Null Json
"""
token = Governor().get_token()
process_identifier = STAGE_Y
try:
background_tasks.add_task(run_in_process, process_data, guid, process_identifier, token)
return {}
except Exception as e:
raise Exception

proc_module.py

from governor import Governor
def process_data(guid:str, process_identifier:str, token):
"""Some data processing"""
Governor().return_token(token)

您可以使用RedisDb进行此操作。它将在内存中保存令牌,任何线程的丁烷代码都可以访问它

Redis列表可以用于此任务。

文档:https://redis.io/docs/data-types/tutorial/#lists

> rpush mylist token1
(integer) 1
> rpush mylist token2
(integer) 2
> rpop mylist
"token1"

它有一个非常有用的python库来发送来自代码的请求:https://github.com/redis/redis-py

也许另一个数据库也可以,但Redis是最简单的。

最新更新