dash和使用队列的另一个线程之间的通信



我编写了一个定期接收和处理地理数据的程序。我想在提供plotlyScattergeo图的dash应用程序中绘制由该程序生成的(纬度,经度)对,或多或少随着数据的输入。

下面是一个双线程程序的MWE,它使用队列从地理数据源接收数据并简单地打印它。本例中的数据只是(lat, lon)元组。

import threading
from queue import Queue
from CoordSource import CoordGetter
class Receiver(object):
def __init__(self, q):
while True:
item = q.get()
print("RECEIVED", item)
if __name__ == '__main__':
mainQueue = Queue()
cg = threading.Thread(target=CoordGetter, args=(mainQueue, ))
cg.start()
rt = threading.Thread(target=Receiver, args=(mainQueue, ))
rt.start()

这是一个基本的dash应用程序的MWE,它创建了一个Scattergeo图,并每两秒钟更新一次绘制的数据,但在这种情况下,它只是绘制了两个硬编码的点:

import dash
import dash_core_components as dcc
import dash_html_components as html
from dash.dependencies import Input, Output
import plotly
import plotly.graph_objects as go
import pandas as pd
app = dash.Dash(__name__, prevent_initial_callbacks=True)
app.layout = html.Div(
html.Div([
dcc.Graph(id='live-update-graph'),
dcc.Interval(
id='interval-component',
interval=1*2000,
n_intervals=1
)
])
)
@app.callback(Output('live-update-graph', 'figure'),
Input('interval-component', 'n_intervals'))
def update_graph_live(n):
fig = go.Figure(go.Scattergeo())
fig.update_geos(projection_type="orthographic")
# hard-coded points, would like to receive from queue instead
df = pd.DataFrame({'lat': [0, 20], 'lon': [20, 0]}) 
fig.add_traces(data=go.Scattergeo(lat=df['lat'], lon=df['lon']))
return fig
if __name__ == '__main__':
app.run_server(debug=True)

根据我对它们的参考,这两个MWEs都经过了测试,并且工作得很好。然而,我一直有麻烦在生成数据的线程和绘制它的dash应用程序之间共享队列.

我已经尝试了很多东西,阅读了很多文档和例子,在这一点上,我很确定我误解了dash如何维护它的线程。因此,作为答案的一部分,我想了解一下dash在这方面是如何工作的。

此外,我应该注意,当我在app.run_server()之前启动CoordGetter时,它似乎启动了两次。也就是说,它的__init__()函数中的任何调试打印语句都输出两次,并且它第二次抱怨它打开的UDP端口已经在使用中。它开始两次,我希望它开始一次,这表明对dash如何处理线程的另一个误解,我想澄清一下。我想我的主要问题的一个好的答案也会澄清这一点,但我想我应该明确地提到它。

注:如果有帮助的话,我可以分享CoordSource,但它有点混乱,并且与外部硬件和软件都有交互。对于这个问题,我认为可以简单地说它是有效的,并将其视为一个黑匣子。我想这将是同样有效的测试在这里做一个小类,随机生成坐标,我也很乐意这样做,使问题更加独立。

编辑:下面的类大致模仿CoordSource.CoordGetter的行为:

from random import randrange, uniform
from time import sleep
class CoordGetter():
def __init__(self, queue):
self.queue = queue
self.generate()
def generate(self):
while True:
sleep(randrange(10))
for i in range(1, randrange(10)):
lat = uniform(-90,90)
lon = uniform(-180,180)
self.queue.put((lat, lon))

TLDR:我如何从CoordSource.CoordGetter(第三个例子)获得数据来绘制,因为它使用Scattergeo(第二个例子)实时更新?创建一个简单的线程脚本(如第一个示例)不会成功。

尝试将标题设置为全局

为例:

import threading
from queue import Queue
from CoordSource import CoordGetter
class Receiver(object):
def __init__(self, q):
while True:
item = q.get()
print("RECEIVED", item)
if __name__ == '__main__':
mainQueue = Queue()
global cg
global rt
cg = threading.Thread(target=CoordGetter, args=(mainQueue, ))
cg.start()
rt = threading.Thread(target=Receiver, args=(mainQueue, ))
rt.start()

最新更新