为什么 ThreadedProcessPoolExecutor 卡住了?



我正在运行一个 Django 服务器,我在 ThreadedProcessPoolExecutor 中运行一个用于并行性的函数,但经过一段时间的完美运行后,它卡住了!

我对此进行了一些研究,现在我认为这是一个僵局,尝试改变一切,但没有一个有效

下面是这段代码。

import json
import os
from time import sleep
import shutil
from HardCode.scripts import BL0
from HardCode.scripts.cibil.Analysis import analyse
from HardCode.scripts.cibil.apicreditdata import convert_to_df
from analysisnode.settings import PROCESSING_DOCS, CHECKSUM_KEY, FINAL_RESULT
from threadedprocess import ThreadedProcessPoolExecutor
from analysisnode import Checksum
import requests

def parallel_proccess_user_records(user_id):
user_data = json.load(open(PROCESSING_DOCS + str(user_id) + '/user_data.json'))
cibil_df = {'status': False, 'data': None, 'message': 'None'}
if os.path.exists(PROCESSING_DOCS + str(user_id) + '/experian_cibil.xml'):
response_parser = convert_to_df(open(PROCESSING_DOCS + str(user_id) + '/experian_cibil.xml'))
cibil_df = response_parser
sms_json = json.load(open(PROCESSING_DOCS + str(user_id) + '/sms_data.json', 'rb'))
try:
if len(sms_json) == 0:
limit = analyse(user_id=user_id, current_loan=user_data['current_loan_amount'], cibil_df=cibil_df,
new_user=user_data['new_user'], cibil_score=user_data['cibil_score'])
response_bl0 = {
"cust_id": user_id,
"status": True,
"message": "No messages found in sms_json",
"result": {
"loan_salary": -9,
"loan": -9,
"salary": -9,
"cibil": limit
}
}
else:
response_bl0 = BL0.bl0(cibil_xml=cibil_df, cibil_score=user_data['cibil_score'], user_id=int(user_id)
, new_user=user_data['new_user'], list_loans=user_data['all_loan_amount'],
current_loan=user_data['current_loan_amount'], sms_json=sms_json)
shutil.rmtree(PROCESSING_DOCS + str(user_id))
try:
os.makedirs(FINAL_RESULT + str(user_id))
except FileExistsError:
pass
except Exception as e:
print(f"error in middleware {e}")
limit = analyse(user_id=user_id, current_loan=user_data['current_loan_amount'], cibil_df=cibil_df,
new_user=user_data['new_user'], cibil_score=user_data['cibil_score'])
response_bl0 = {
"cust_id": user_id,
"status": True,
"message": "Exception occurred, I feel lonely in middleware",
"result": {
"loan_salary": -9,
"loan": -9,
"salary": -9,
"cibil": limit
}
}
with open(FINAL_RESULT + str(user_id) + '/user_data.json', 'w') as json_file:
json.dump(response_bl0, json_file, ensure_ascii=True, indent=4)

def process_user_records(user_ids):
with ThreadedProcessPoolExecutor(max_processes=8, max_threads=16) as p:
p.map(parallel_proccess_user_records, user_ids)

if __name__ == "__main__":
while True:
no_of_dirs = len(os.listdir(PROCESSING_DOCS))
if no_of_dirs > 0:
directories = os.listdir(PROCESSING_DOCS)
user_ids = [user_id for user_id in directories]
process_user_records(user_ids)
print("***********")
print("Done : ")
print(user_ids)
print("SLEEPING.....zzzzzz")
sleep(10)

代码说明: 此代码在无限循环中运行,查找名为 PROCESSING_DOCS 的文件夹中的更改,如果将任何新文件添加到该文件夹,它会自动对所有文件运行代码,否则它将保持空闲状态。

我在这里做错了什么?

我不确定,但您可以尝试使用ProcessPoolExecutor,因为它使用进程池来执行调用asynchronously。它避开了 GIL。

相关内容

  • 没有找到相关文章