显示Jupyter中Redis订阅的流式数据帧



我在Redis中有一个Redis pub子频道"价格更新",发布者为其设置股价更新。我想显示一个流式网格,在网格末尾不断添加价格更新。

到目前为止,我已经创建了一个我想做的非工作版本

from streamz import Stream
from streamz.dataframe import DataFrame
source = Stream()
data = []
def handler(message):
json_data = json.loads(message['data'])
df = pd.DataFrame.from_dict([json_data]).set_index('sym')
source.map(handler).sink(data.append)
sdf = DataFrame(source)
## Run this in a different thread 
p.subscribe('price-updates')
while True:
message = p.get_message()
if message:
source.emit(message)
time.sleep(0.001)
## end of thread block

#displayStreamingDataGrid(sdf)

如果有一个在sdf方面更有经验的人能帮我做这件事,我将不胜感激。

我能够在没有流的情况下做到这一点。然而,我并没有得到一个更新到位的流式网格,而是显示和清除输出,这很烦人。

这是一个jupyter细胞的输出窗口小部件

import ipywidgets as iw
from IPython.display import display 
o = iw.Output()
def output_to_widget(df, output_widget): 
output_widget.clear_output()
with output_widget: 
display(df)
o

以下是订阅redis并处理消息的代码

import redis, json, time
r = redis.StrictRedis(host = HOST, password = PASS, port = PORT, db = DB)
p = r.pubsub(ignore_subscribe_messages=True)
p.subscribe('QUOTES')
mdf = pd.DataFrame()
while True:
message = p.get_message()
if message:
json_msg = json.loads(message['data'])
df = pd.DataFrame([json_msg]).set_index('sym')
mdf = mdf.append(df)
output_to_widget(mdf, o)
time.sleep(0.001)

您可以使用https://github.com/AaronWatters/jp_proxy_widget创建html表,您可以就地更新该表,而无需在更新之间明显清除该表。

我在这里放了一个笔记本示例:https://github.com/AaronWatters/jp_doodle/blob/master/notebooks/misc/In%20place%20html%20table%20update%20demo.ipynb

诀窍是创建一个小部件,显示一个表并附加修改表格的更新操作:

# Create a proxy widget with a table update method
import jp_proxy_widget
def updateable_table(headers, rows):
w = jp_proxy_widget.JSProxyWidget()
w.js_init("""
# injected javascript for the widget:
element.update_table = function(headers, rows) {
element.empty();
var table = $("<table border style='text-align:center'/>");
table.appendTo(element);
var header_row = $("<tr/>");
for (var i=0; i<headers.length; i++) {
$("<th style='text-align:center'>" + headers[i] + "</th>")
.width(50)
.appendTo(header_row);
}
header_row.appendTo(table);
for (var j=0; j<rows.length; j++) {
var table_row = $("<tr/>").appendTo(table);
var data_row = rows[j];
for (var i=0; i<data_row.length; i++) {
$("<td>" + data_row[i] + "</td>").appendTo(table_row);
}
}
}
element.update_table(headers, rows);
""", headers=headers, rows=rows)
return w
# show the widget
w = updateable_table(headers, rows)
w

更新小部件的代码

# Update the widget 20 times
import time
count = -20
for i in range(21):
time.sleep(1)
rows = [rows[-1]] + rows[:-1]  # rotate the rows
rows[0][0] = count  # change the upper left entry.
count += 1
w.element.update_table(headers, rows)

在没有可见擦除的情况下就地更新表。示例上面链接的笔记本还展示了如何使用pandas数据帧。

最新更新