在不终止进程的情况下中止Python进程中的代码执行



假设我有一个(websocket(API,api.py,例如:

from flask import Flask, request
from flask_socketio import SocketIO, emit
from worker import Worker
app = Flask()
socketio = SocketIO(app)
worker = Worker()
worker.start()
@socketio.on('connect')
def connect():
print("Client", request.sid, "connected")
@socketio.on('get_results')
def get_results(query):
"""
The only endpoing of the API.
"""
print("Client", request.sid, "requested results for query", query)
# Set the worker to work, wait for results to be ready, and
# send the results back to the client.
worker.task_queue.put(query)
results = worker.result_queue.get()
emit("results", results)
@socketio.on('disconnect')
def disconnect():
print("Client", request.sid, "disconnected, perhaps before results where ready")
# What to do here?
socketio.run(app, host='')

API将为许多客户端提供服务,但只有一个工作者来产生应该提供的结果。worker.py:

from multiprocessing import Process, Queue
class Worker(Process):
def __init__(self):
super().__init__()
self.task_queue = Queue()
self.result_queue = Queue()
self.some_stateful_variable = 0
# Do other computationally expensive work

def reset_state(self):
# Computationally inexpensive.
pass
def do_work(self, task):
# Computationally expensive. Takes long time.
# Modifies internal state.
pass
def run(self):
while True:
task = self.task_queue.get()
results = self.do_work(task)
self.result_queue.put(results)

工人收到一个请求,即要做的任务,并着手产生结果。当结果准备好时,将向客户端提供它。

但并非所有客户都有耐心。在结果准备好之前,他们可能会离开,即与API断开连接。他们不想要它们,因此工人最终完成了一项不需要完成的任务。这使得队列中的其他客户端不必要地等待。如何避免这种情况,并让工作人员为不需要完成的任务中止执行do_work

  1. 在客户端:当用户关闭浏览器选项卡或将页面发送请求留给Flask服务器时,请求应包含要取消的任务的id。

  2. 在服务器端,将任务的取消状态放入数据库或Flask server和Worker Process 之间的任何共享变量中

  3. 将任务处理划分为几个阶段,并在每个阶段之前检查数据库中任务的状态,如果状态已取消,则停止任务处理。

第1点的另一个选择是在服务器端,在来自客户端的状态请求之间的单独进程计数间隔中进行一些监控。

我通过以下方式启动了一个完全独立的流程,从而处理了类似的问题:

sp.call('start python path\worker.py', shell=True)

worker.py会通过redis将其PID报告回api.py,然后直接从api.py 在任何点终止进程

当然,这对你来说有多可行将取决于有多少数据驻留在api.py中并共享给worker.py——是否可行也通过redis传递由你决定。

额外的好处是可以将套接字与繁重的计算解耦,并且可以实现准多核(每个worker.py一个线程(。如果愿意,可以通过将多处理集成到每个worker.py.中来实现全多核。

最新更新