在Django中使用@shared_task时管道破裂错误



我利用芹菜库在后台执行任务,即使当用户关闭或刷新页面。因为这个任务需要一些时间,所以我选择使用芹菜。

只要我不刷新或关闭页面,算法就可以正常工作。然而,我遇到了"从('127.0.0.1',56578)"错误发生时。我也尝试在执行nonFollowers_users()后更新帐户,但是我收到一个错误,说明MySQL服务器已经离开。

for id in accounts:
account = Account.objects.get(id=id)

#below method is @shared_task from celery
nonFollowers_users(account, count, sleep)
try:
return JsonResponse({'success': True})
except (ConnectionResetError, BrokenPipeError):
# the connection was closed, return a response with an appropriate status code
return HttpResponse(status=499)

很抱歉耽搁了。这是我的基本结构:

一般

我相信这是不言自明的,唯一的曲线球是lock系统。这样您就不会启动可能发生冲突的多个线程。正因为如此你不直接调用线程函数而是使用一个直通(在示例中是thread_switchboard),这样就可以立即返回线程状态

我个人使用{thread}.lock文件。我还添加了一个缓存版本,客观上更好,但我认为它只会在生产环境中工作

Structure (tree)

.
├── my_django
│   └── settings.py
├── common
│   ├── decorators.py
│   ├── threaded_functions.py
│   └── tmp
│       └── .gitignore
└── my_app
    └── views.py

decorators.py

def start_new_thread(function):
from threading import Thread
def decorator(*args, **kwargs):
t = Thread(target=function, args=args, kwargs=kwargs)
t.daemon = True
t.start()
return decorator

threaded_functions.py

from common.decorators import start_new_thread
# Thread Helpers
#   for keeping track of which ones are currently running
#       so we don't spin up duplicates
def thread_storage():
use_cache = False
return use_cache
def create_thread_lock(name):
from datetime import datetime
details = {
'start_time':  datetime.now(),
}
if thread_storage():
from django.core.cache import cache
seconds = 600  # 10 min
cache.set(f'Thread:{name}', details, seconds)
else:
import json
with open(f'common/tmp/{name}.lock', 'w', encoding='utf-8') as file:
file.write(json.dumps(details, sort_keys=True, indent=2))
def is_running(name):
# You could even pull data out of storage, like start_time
if thread_storage():
from django.core.cache import cache
return bool(cache.get(f'Thread:{name}'))
else:
import os
return os.path.isfile(f'common/tmp/{name}.lock')
return False
def remove_thread_lock(name):
if thread_storage():
from django.core.cache import cache
cache.delete(f'Thread:{name}')
else:
import os
target = f'common/tmp/{name}.lock'
if os.path.isfile(target)
os.remove(target)
# Thread Helpers - Done
# Actual Threads
#   We **don't** want to call these directly (see next block for call)
@start_new_thread
def threaded_nonfollowers(request, name):
# Wrapped so we can handle errors
try:
# do long actions inside here!
accounts = request.POST.get('accounts').split(',')  # or something?
for account_id in accounts:
account = Account.objects.get(id=account_id)
# Do actual contents of nonFollowers_users right here

# remove lock, thread spinning down
remove_thread_lock(name)
except Exception as e:
# Crash
# - Remove Lock
remove_thread_lock(name)
# Send crash report 
#   Django doesn't report thread crashes by default (odd)
#
# We passed `request` this entire time just so the error report will be
#   usable to recreate the problem (POST data)
import sys
import logging
logger = logging.getLogger('django')
logger.exception(sys.exc_info(), extra={'status_code': 500, 'request': request})
# Actual Threads - Done

# Thread Handler
#   Could break these up, to be more specific (ex: est times) but heck- switchboard!
def thread_switchboard(request, name):
threaded_functions_by_name = {
'nonfollowers': threaded_nonfollowers,
}
func = threaded_functions_by_name.get(name, None)  # <- love these
if func:
# is valid function, is it running?
if not is_running(name):
# not running, start function, return msg
func(request, name)
return {'status': True, 'msg': 'Started thread'}
return {'status': False, 'msg': 'Thread is already running'}
return {'status': False, 'msg': 'Invalid function'}

Views.py

from django.http import Http404, JsonResponse
def account_nonfollowers_post(request):
if request.method == 'POST':
# Status:
#   True  -> Good!
#   False -> Bad!
data = {'status': False}  # assume bad
if request.POST.get('accounts'):
# Post is as expected.. continue..
from common.threaded_functions import thread_switchboard
data = thread_switchboard(request, name='nonfollowers')
# - will immediately return:
#   a: True  + Started
#   b: False + Running
#   c: False + Invalid
else:
data['msg'] = 'accounts were not provided'
return JsonResponse(data)
raise Http404

tmp/。gitignore (. locks)

*
*/
!.gitignore

忽略所有tmp文件,但通过包含.gitignore

提交文件夹的存在

边注该解决方案的主要问题是,如果服务器在删除之前发生故障.lock文件,它不会启动线程,直到它被删除-因此缓存的expire某种程度上缓解了这一点。
我通过在我的.lock中添加expire时间戳(现在是+est+grace)来构建这个问题,而不仅仅是file exists,我还检查了该过期。
但我发现这是一个边缘情况…从功能上讲,它是一个很好的核心,你可以根据自己的需要,把它弄得复杂或不复杂。如果你遇到问题,请告诉我!(:

最新更新