在 Python3.4 中,如何从以事件流作为内容类型的 Web 请求中读取数据?流数据每 30 秒更新一次,对于每个数据流,我都会提取和处理数据。
例:
我在 http://example.com/value121 上执行了请求在标题中,我可以看到:内容类型:文本/事件流
每 30 秒,我可以看到:
名称:值121数据: {'old_value': xx, 'new_value': xx}
我会提取new_value。
在服务器和客户端上实现text/event-stream
相对容易。假设 example.com
处的服务器是在端口 80
(默认端口)上运行的简单服务器:
import http.server
import json
import socketserver
import time
class RequestHandler(http.server.SimpleHTTPRequestHandler):
def do_GET(self):
self.send_response(200)
self.send_header('Connection', 'Keep-Alive')
self.send_header('Content-Type', 'text/event-stream')
self.end_headers()
name = self.path.strip('/').encode('UTF-8')
old_value = None
new_value = None
while True:
old_value, new_value = new_value, time.time()
data = json.dumps({
'old_value': old_value,
'new_value': new_value,
}).encode('UTF-8')
self.wfile.write(b'name: ')
self.wfile.write(name)
self.wfile.write(b'rn')
self.wfile.write(b'data: ')
self.wfile.write(data)
self.wfile.write(b'rn')
self.wfile.write(b'rn')
self.wfile.flush()
time.sleep(30)
server = socketserver.TCPServer(('', 80), RequestHandler)
server.serve_forever()
每 30 秒,它输出一个事件:
name: value121<CR><LF>
data: {"old_value": <old-timestamp>, "new_value": <new-timestamp>}<CR><LF>
<CR><LF>
然后,客户端可以连接到端口 80
上的 http://example.com/value121
并分析每个事件:
import http.client
import json
connection = http.client.HTTPConnection('http://example.com', 80)
connection.request('GET', '/value121', headers={
'Accept': 'text/event-source',
'Connection': 'Keep-Alive',
})
with connection.getresponse() as response:
while not response.closed:
event = {}
for line in response:
line = line.decode('UTF-8')
if line == 'rn':
# End of event.
break
elif line.startswith(':'):
# Comment, ignore.
pass
else:
# Data line.
key, value = line.split(':', 1)
value = value.strip()
if key == 'data':
value = json.loads(value)
event[key] = value
# Handle event, extract values, etc.
print(event)
每 30 秒,客户端打印出:
'data': {'old_value': <old-timestamp>, 'new_value': <new-timestamp>}, 'name': 'value121'}
注意:这是一个非常基本的实现。有关详细信息,请参阅使用服务器发送的事件 § 事件流格式。
试一试sseclient
:它使用简单的迭代器样式接口处理 SSE 消息。