与烧瓶或其他Python WebFramework并行生成和处理Websocket输出的背景线程



好吧,我知道那是一口。我很茫然如何处理这个问题。我想运行与烧瓶并行处理数据的线程,但是我没有看到很多人这样做。这是针对个人网络应用程序的,所以我不想走芹菜和兔子的道路。我已经创建了一个连接到股票经纪人API并流股票数据的模块。为了简单起见,只需说它以每秒1个数字的速率生成一个随机数。在由数字生成器线程创建的新数据的每个刻度上,我希望其他一些线程处理相同的数字。让我们称它们为数学线程。一旦他们完成了处理最新的数字,我希望将结果组合在一起(将其组合为JSON)并通过Websocket发送。我已经独立地使用烧瓶汤来通过Websocket发送数据。这是我想完成的内容的例证,可以将每个框视为线程。

                     -------------
                     |  Number   |
                     | Generator | (rate of 1 number / second)
                     -------------
                           |  (same number sent to all 3 math threads)
                      -----+----- 
                     /     |     
                    v      v      v
            ---------  ---------  ---------
            | math1 |  | math2 |  | math3 |
            ---------  ---------  ---------
                |          |          |  (results combined and sent over Websocket)
                v          v          v
    ---------------------------------------
    | Flask |       WebSocket Handler     |
    ---------------------------------------

这是简单的Websocket代码。

app = Flask(__name__)
sockets = Sockets(app)
@sockets.route('/stream')
def stream_socket(ws):
    while True: # for now just sending endless stream of time + random number every second
        message = {"time": int(time.time()*1000), "data": randrange(100)}
        ws.send(json.dumps(message))
        time.sleep(1)
@app.route('/')
def test():
    return render_template('main.html')
if __name__ == '__main__':
    app.run(host='0.0.0.0',debug=True,threaded=True)

因此,我有"数字生成器"模块(实际上是库存报价流)和Websocket连接独立工作。我只需要将它们与螺纹连接在一起,这就是我正在挣扎的地方。如果该数字发生器是一个简单的随机数生成器,并且数学线程也很简单(例如2*x,sinc(x)等)并行与烧瓶进行线程。也许是一些骨架代码。谢谢。

更新:我可以得到单独的线程与烧瓶并行运行,就像以下简单的内容一样。当我运行" python test.py"时,这起作用了,但是要让Websocket上班,我使用像gunicorn -k blask_sockets.worker test:app'这样的gunicorn。但是,这似乎阻止了多线程的工作。

Update2:我能够使用GEVENT而不是Gunicorn获得Websocket和多线程的工作。下面更新的代码。

from gevent import pywsgi
from geventwebsocket.handler import WebSocketHandler
class myThread(threading.Thread):
    def __init__(self, wait, msg):
        super(myThread, self).__init__()
        self.wait = wait
        self.msg = msg
    def run(self):
        for i in range(5):
            time.sleep(self.wait)
            print self.msg
app = Flask(__name__)
sockets = Sockets(app)
@sockets.route('/stream')
def stream_socket(ws):
    while True: # for now just sending endless stream of time + random number every second
        message = {"time": int(time.time()*1000), "data": randrange(100)}
        ws.send(json.dumps(message))
        time.sleep(1)
@app.route('/')
def test():
    return render_template('main.html')
if __name__ == '__main__':
    thread1 = myThread(2, "thread1")
    thread2 = myThread(3, "thread2")
    thread1.start()
    thread2.start()
    server = pywsgi.WSGIServer(('', 5000), app, handler_class=WebSocketHandler)
    server.serve_forever()
    #app.run(host='0.0.0.0',debug=True,threaded=True)

现在,我已经能够实现运行的并行线程以在websocket上独立发送数据时,我想我现在最大的问题是如何与Websocket Decorator共享并行工作人员线程的结果因此,它可以发送由这些单独线程处理的数据。有没有办法将 @sockets.route装饰器放置在线程中?

好吧,这样的事情似乎有效。单独的线程以每秒1的速率生成数字,并将其放在队列中。Websocket处理程序可用时从队列中获取数据,并将其发送到Websocket。显然,这只是一个简单的例子,但是它使我朝着正确的方向前进。如果有人有任何建议,仍然会很好奇。

app = Flask(__name__)
sockets = Sockets(app)
myQueue = Queue.Queue(10)
class myThread(threading.Thread):
    def __init__(self, length):
        super(myThread, self).__init__()
        self.length = length
    def run(self):
        for i in range(self.length):
            time.sleep(1)
            myQueue.put({"time": int(time.time()*1000), "data": randrange(100)})

@sockets.route('/stream')
def stream_socket(ws):
    while True:
        message = myQueue.get()
        ws.send(json.dumps(message))

@app.route('/')
def test():
    return render_template('main.djhtml')
if __name__ == '__main__':
    thread1 = myThread(30)
    thread1.start()
    server = pywsgi.WSGIServer(('', 5000), app, handler_class=WebSocketHandler)
    server.serve_forever()

最新更新