在 Flask 应用程序中作为后台进程循环运行



我希望运行一个在sys.stdin上读取流的Web服务器。该读取需要连续发生,例如在一段时间循环中。

但是,我也希望运行一个 Flask 服务器,该服务器侦听要/data的请求,并将从sys.stdin读取的最后一位数据发送到请求代理。

到目前为止,我发现我的 while 循环正在停止我的应用程序的执行,这绝对有意义。这是我的设置:

from flask import Flask, jsonify
import sys
# state
frames = []
frame = []
while True:
  l = sys.stdin.readline()
  if 'end_frame' in l:
    frames = [frame] + frames
    frame = []
  elif l.rstrip('n'):
    frame.append(l.rstrip('n'))
# app
app = Flask(__name__, static_url_path='')
@app.route('/frames')
def get_frames():
  return jsonify(frames)
app.run(host='0.0.0.0', port=5050)

有没有办法将该while循环作为后台进程运行,以释放烧瓶路由侦听器?任何建议都会有所帮助!

尝试查看类似 BackgroundScheduler 的东西。它将任务作为单独的线程在后台运行,而不会使烧瓶侦听器停止。

from apscheduler.schedulers.background import BackgroundScheduler
...
...
def readlines():
  l = sys.stdin.readline()
  if 'end_frame' in l:
    frames = [frame] + frames
    frame = []
  elif l.rstrip('n'):
    frame.append(l.rstrip('n'))
with app.app_context():
    scheduler = BackgroundScheduler()
    scheduler.add_job(readlines, 'interval', seconds=10)
    scheduler.start()

您可以尝试在线程中运行它

import threading
x = threading.Thread(target=thread_function, args=(index,))
threads.append(x)
x.start()

对于线程函数,在其中定义 while 循环

我最终做了以下事情:

我有一个小文件publisher.py,它从给定主机上的端口读取(尽管它也可以使用上面的代码从 sys.stdin 读取(在 while 循环中。当它组成一个frame时,它会将该帧发布到 redis 数据存储。

然后,在我的 Flask 路由侦听器/frame 中,我只需 ping redis数据存储,然后jsonify结果。通过这种方式,我的发布者和服务器协同工作,在数据进入时向客户端提供数据......

publisher.py

import sys, socket, redis, json
# config
stream = {'host': '127.0.0.1', 'port': 6000} # streaming data host / port
r = redis.Redis(host='127.0.0.1', port=6379) # redis instance host / port
# consume data from a host+port and publish to redis on localhost
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((socket.gethostbyname(stream['host']), stream['port'])) # host, port
# consume data
frame = [] # initialize the container obj that will hold all frame data
while True:
  data = client.recv(1024).decode('utf8')
  for l in data.split('n'):
    if 'end_frame' in l:
      d = {i.split(':')[0]: i.split(':')[1] for i in frame if ':' in i}
      r.set('frame', json.dumps(d))
      frame = []
      print(' * published frame', d.get('frame_number', ''))
    elif l.rstrip('n'):
      frame.append(l.rstrip('n'))

server.py

from flask import Flask, jsonify
import redis, sys, os, json
# app
app = Flask(__name__, static_url_path='')
# redis
r = redis.Redis(host='127.0.0.1', port=6379) # redis instance host / port
# route listeners
@app.route('/api/frame')
def get_frame():
  frame = json.loads(r.get('frame').decode('utf8'))
  return jsonify(frame)
if __name__ == '__main__':
  app.run(host='0.0.0.0', port=5050)

最新更新