好吧,我知道那是一口。我很茫然如何处理这个问题。我想运行与烧瓶并行处理数据的线程,但是我没有看到很多人这样做。这是针对个人网络应用程序的,所以我不想走芹菜和兔子的道路。我已经创建了一个连接到股票经纪人API并流股票数据的模块。为了简单起见,只需说它以每秒1个数字的速率生成一个随机数。在由数字生成器线程创建的新数据的每个刻度上,我希望其他一些线程处理相同的数字。让我们称它们为数学线程。一旦他们完成了处理最新的数字,我希望将结果组合在一起(将其组合为JSON)并通过Websocket发送。我已经独立地使用烧瓶汤来通过Websocket发送数据。这是我想完成的内容的例证,可以将每个框视为线程。
-------------
| Number |
| Generator | (rate of 1 number / second)
-------------
| (same number sent to all 3 math threads)
-----+-----
/ |
v v v
--------- --------- ---------
| math1 | | math2 | | math3 |
--------- --------- ---------
| | | (results combined and sent over Websocket)
v v v
---------------------------------------
| Flask | WebSocket Handler |
---------------------------------------
这是简单的Websocket代码。
app = Flask(__name__)
sockets = Sockets(app)
@sockets.route('/stream')
def stream_socket(ws):
while True: # for now just sending endless stream of time + random number every second
message = {"time": int(time.time()*1000), "data": randrange(100)}
ws.send(json.dumps(message))
time.sleep(1)
@app.route('/')
def test():
return render_template('main.html')
if __name__ == '__main__':
app.run(host='0.0.0.0',debug=True,threaded=True)
因此,我有"数字生成器"模块(实际上是库存报价流)和Websocket连接独立工作。我只需要将它们与螺纹连接在一起,这就是我正在挣扎的地方。如果该数字发生器是一个简单的随机数生成器,并且数学线程也很简单(例如2*x,sinc(x)等)并行与烧瓶进行线程。也许是一些骨架代码。谢谢。
更新:我可以得到单独的线程与烧瓶并行运行,就像以下简单的内容一样。当我运行" python test.py"时,这起作用了,但是要让Websocket上班,我使用像gunicorn -k blask_sockets.worker test:app'这样的gunicorn。但是,这似乎阻止了多线程的工作。
Update2:我能够使用GEVENT而不是Gunicorn获得Websocket和多线程的工作。下面更新的代码。
from gevent import pywsgi
from geventwebsocket.handler import WebSocketHandler
class myThread(threading.Thread):
def __init__(self, wait, msg):
super(myThread, self).__init__()
self.wait = wait
self.msg = msg
def run(self):
for i in range(5):
time.sleep(self.wait)
print self.msg
app = Flask(__name__)
sockets = Sockets(app)
@sockets.route('/stream')
def stream_socket(ws):
while True: # for now just sending endless stream of time + random number every second
message = {"time": int(time.time()*1000), "data": randrange(100)}
ws.send(json.dumps(message))
time.sleep(1)
@app.route('/')
def test():
return render_template('main.html')
if __name__ == '__main__':
thread1 = myThread(2, "thread1")
thread2 = myThread(3, "thread2")
thread1.start()
thread2.start()
server = pywsgi.WSGIServer(('', 5000), app, handler_class=WebSocketHandler)
server.serve_forever()
#app.run(host='0.0.0.0',debug=True,threaded=True)
现在,我已经能够实现运行的并行线程以在websocket上独立发送数据时,我想我现在最大的问题是如何与Websocket Decorator共享并行工作人员线程的结果因此,它可以发送由这些单独线程处理的数据。有没有办法将 @sockets.route装饰器放置在线程中?
好吧,这样的事情似乎有效。单独的线程以每秒1的速率生成数字,并将其放在队列中。Websocket处理程序可用时从队列中获取数据,并将其发送到Websocket。显然,这只是一个简单的例子,但是它使我朝着正确的方向前进。如果有人有任何建议,仍然会很好奇。
app = Flask(__name__)
sockets = Sockets(app)
myQueue = Queue.Queue(10)
class myThread(threading.Thread):
def __init__(self, length):
super(myThread, self).__init__()
self.length = length
def run(self):
for i in range(self.length):
time.sleep(1)
myQueue.put({"time": int(time.time()*1000), "data": randrange(100)})
@sockets.route('/stream')
def stream_socket(ws):
while True:
message = myQueue.get()
ws.send(json.dumps(message))
@app.route('/')
def test():
return render_template('main.djhtml')
if __name__ == '__main__':
thread1 = myThread(30)
thread1.start()
server = pywsgi.WSGIServer(('', 5000), app, handler_class=WebSocketHandler)
server.serve_forever()