我希望运行一个在sys.stdin
上读取流的Web服务器。该读取需要连续发生,例如在一段时间循环中。
但是,我也希望运行一个 Flask 服务器,该服务器侦听要/data
的请求,并将从sys.stdin
读取的最后一位数据发送到请求代理。
到目前为止,我发现我的 while 循环正在停止我的应用程序的执行,这绝对有意义。这是我的设置:
from flask import Flask, jsonify
import sys
# state
frames = []
frame = []
while True:
l = sys.stdin.readline()
if 'end_frame' in l:
frames = [frame] + frames
frame = []
elif l.rstrip('n'):
frame.append(l.rstrip('n'))
# app
app = Flask(__name__, static_url_path='')
@app.route('/frames')
def get_frames():
return jsonify(frames)
app.run(host='0.0.0.0', port=5050)
有没有办法将该while
循环作为后台进程运行,以释放烧瓶路由侦听器?任何建议都会有所帮助!
尝试查看类似 BackgroundScheduler 的东西。它将任务作为单独的线程在后台运行,而不会使烧瓶侦听器停止。
from apscheduler.schedulers.background import BackgroundScheduler
...
...
def readlines():
l = sys.stdin.readline()
if 'end_frame' in l:
frames = [frame] + frames
frame = []
elif l.rstrip('n'):
frame.append(l.rstrip('n'))
with app.app_context():
scheduler = BackgroundScheduler()
scheduler.add_job(readlines, 'interval', seconds=10)
scheduler.start()
您可以尝试在线程中运行它
import threading
x = threading.Thread(target=thread_function, args=(index,))
threads.append(x)
x.start()
对于线程函数,在其中定义 while 循环
我最终做了以下事情:
我有一个小文件publisher.py
,它从给定主机上的端口读取(尽管它也可以使用上面的代码从 sys.stdin 读取(在 while 循环中。当它组成一个frame
时,它会将该帧发布到 redis 数据存储。
然后,在我的 Flask 路由侦听器/frame
中,我只需 ping redis数据存储,然后jsonify
结果。通过这种方式,我的发布者和服务器协同工作,在数据进入时向客户端提供数据......
publisher.py:
import sys, socket, redis, json
# config
stream = {'host': '127.0.0.1', 'port': 6000} # streaming data host / port
r = redis.Redis(host='127.0.0.1', port=6379) # redis instance host / port
# consume data from a host+port and publish to redis on localhost
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((socket.gethostbyname(stream['host']), stream['port'])) # host, port
# consume data
frame = [] # initialize the container obj that will hold all frame data
while True:
data = client.recv(1024).decode('utf8')
for l in data.split('n'):
if 'end_frame' in l:
d = {i.split(':')[0]: i.split(':')[1] for i in frame if ':' in i}
r.set('frame', json.dumps(d))
frame = []
print(' * published frame', d.get('frame_number', ''))
elif l.rstrip('n'):
frame.append(l.rstrip('n'))
server.py:
from flask import Flask, jsonify
import redis, sys, os, json
# app
app = Flask(__name__, static_url_path='')
# redis
r = redis.Redis(host='127.0.0.1', port=6379) # redis instance host / port
# route listeners
@app.route('/api/frame')
def get_frame():
frame = json.loads(r.get('frame').decode('utf8'))
return jsonify(frame)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5050)