为什么 python 客户端没有收到 SSE 事件



我有一个python客户端从带有node的服务器监听SSE事件.js API

流程是我通过call_notification.py向节点.js API 发送事件,并使用run.sh循环运行seevents.py(见下文)

但是我没有看到 python 客户端正在接收此 SSE 事件? 关于为什么会这样的任何指导?

call_notification.py

import requests
input_json = {'BATS':'678910','root_version':'12A12'}
url = 'http://company.com/api/root_event_notification?params=%s'%input_json
response = requests.get(url)
print response.text

节点.js API

app.get("/api/root_event_notification", (req, res, next) => {
console.log(req.query.params)
var events = require('events');
var eventEmitter = new events.EventEmitter();
//Create an event handler:
var myEventHandler = function () {
console.log('new_root_announced!');
res.status(200).json({
message: "New root build released!",
posts: req.query.params
});
}

seevents.py(侦听 SSE 事件的 python 客户端)

import json
import pprint
import sseclient
def with_urllib3(url):
"""Get a streaming response for the given event feed using urllib3."""
import urllib3
http = urllib3.PoolManager()
return http.request('GET', url, preload_content=False)
def with_requests(url):
"""Get a streaming response for the given event feed using requests."""
import requests
return requests.get(url, stream=True)
url = 'http://company.com/api/root_event_notification'
response = with_urllib3(url)  # or with_requests(url)
client = sseclient.SSEClient(response)
#print client.events()
for event in client.events():
print "inside"
pprint.pprint(json.loads(event.data))

run.sh

#!/bin/sh
while [ /usr/bin/true ]
do
echo "Running sseevents.py"
python sseevents.py  2>&1 | tee -a sseevents.log.txt
echo "sleeping for 30 sec"
sleep 30
done

输出:-

Run call_notification.py on Terminal
node.js API OUTPUT
new_root_announced!
{'root_version': 'ABCD', 'BATS': '143'}
./run.sh --> DON'T SEE ABOVE EVENT below
Running sseevents.py
sleeping for 30 sec
Running sseevents.py
sleeping for 30 sec
Running sseevents.py
sleeping for 30 sec

非常简短地回答您的问题:

服务器代码未将 SSE 消息发送回客户端。

为什么?因为您需要遵循 SSE 格式。

根据Jason BUTZ在服务器发送的事件与节点

您应该发送Connectionkeep-alive标头,以确保客户端也保持连接打开。应发送带有值no-cacheCache-Control标头,以阻止缓存数据。最后,需要将Content-Type设置为text/event-stream

完成所有这些操作后,应将换行符 () 发送到客户端,然后可以发送事件。 事件必须作为字符串发送,但该字符串中的内容无关紧要。JSON 字符串非常好。

事件数据必须以"data: <DATA TO SEND HERE>n"格式发送。

请务必注意,每行末尾应为换行符。为了表示事件的结束,还需要添加额外的换行符。

多条数据线完全没问题。

详细回答您的问题:

根据埃里克·比德尔曼(Eric Bidelman)在 html5rocks.com:

使用 SSE 进行通信时,服务器可以随时将数据推送到您的应用程序,而无需发出初始请求。换句话说,更新可以在发生时从服务器流式传输到客户端。

但是,为了实现这一点,客户端必须通过请求它来"开始"并准备接收消息(当它们发生时)。

  • "启动"是通过调用 SSE API 端点(在您的情况下,调用 Node.js API 代码)来完成的。
  • 准备
  • 是通过准备处理异步消息流来完成的。

SSE 在服务器和客户端之间打开单个单向通道

* 重点是我的

这意味着服务器具有到客户端的"直接"通道。它不打算由不是"客户端"代码的其他进程/代码"启动"(打开)。

假设从OP评论...

预期行为(详细)

  1. 客户端 Alice 使用参数调用 API 端点{name: "Alice"},没有任何(可见)发生。

  2. 。然后客户端 Bob 调用带有参数的 API 端点{name: "Bob"},客户端 Alice 收到一个有效负载{name: "Bob", says: "Hi"}的 SSE。

  3. 。然后客户端 Carol 调用带有参数的 API 端点{name: "Carol"},客户端 Alice 和 Bob 各自收到一个有效负载{name: "Carol", says: "Hi"}的 SSE。

  4. 。等等。每次新客户端使用 params 调用 API 端点时,通道"打开"的所有其他客户端都将收到具有新"Hi"有效负载的 SSE。

  5. 。然后客户端 Bob 与服务器"断开连接",客户端 Alice、客户端 Carol 和所有具有"打开"通道的客户端将收到有效负载{name: "Bob", says: "Bye"}的 SSE。

  6. 。等等。每次旧客户端与服务器"断开连接"时,具有"打开"通道的所有其他客户端都将收到具有新"再见"有效负载的 SSE。

抽象行为

  • 每个要求"打开"发送一些参数的通道或旧客户端与服务器"断开连接"的新客户端,它们会导致服务器中的事件。
  • 每次服务器中发生此类事件时,服务器都会向所有"开放"通道发送带有参数的 SSE 消息和作为有效负载的消息。

关于阻塞的说明 每个具有"开放"通道的客户端都将"卡住"在无限等待事件发生的循环中。客户端设计有责任使用"线程"代码技术来避免阻塞。

法典

您的 Python 客户端应该"要求"启动单个单向通道并继续等待,直到通道关闭。不应该结束并使用不同的频道重新开始。它应该保持相同的通道打开。

从网络的角度来看,它就像一个不会结束的"长"响应(直到 SSE 消息传递结束)。回应只是"不断来来去"。

你的 Python 客户端代码就是这样做的。我注意到它是 sseclient-py 库中使用的确切示例代码。

Python 3.4 的客户端代码

若要包含要发送到服务器的参数,请使用Requests库 docs/#passing-parameters-in-url 中的一些代码。

因此,混合这些示例,我们最终会得到以下代码作为您的 Python 3.4 客户端:

import json
import pprint
import requests
import sseclient # sseclient-py
# change the name for each client
input_json = {'name':'Alice'}
#input_json = {'name':'Bob'}
#input_json = {'name':'Carol'}
url = 'http://company.com/api/root_event_notification'
stream_response = requests.get(url, params=input_json, stream=True)
client = sseclient.SSEClient(stream_response)
# Loop forever (while connection "open")
for event in client.events():
print ("got a new event from server")
pprint.pprint(event.data)

Python 2.7 的客户端代码

若要包含要发送到服务器的参数,请使用库将它们在 URL 中编码为查询参数urllib.urlencode()

使用urllib3.PoolManager().request()发出 http 请求,这样您最终会得到流响应。

请注意,sseclient库以 unicode 字符串的形式返回事件数据。要将 JSON 对象转换回 python 对象(带有 python 字符串),请使用byteify,一个递归自定义函数(感谢 Mark Amery )。

使用以下代码作为 Python 2.7 客户端:

import json
import pprint
import urllib
import urllib3
import sseclient # sseclient-py
# Function that returns byte strings instead of unicode strings
# Thanks to:
# [Mark Amery](https://stackoverflow.com/users/1709587/mark-amery)
def byteify(input):
if isinstance(input, dict):
return {byteify(key): byteify(value)
for key, value in input.iteritems()}
elif isinstance(input, list):
return [byteify(element) for element in input]
elif isinstance(input, unicode):
return input.encode('utf-8')
else:
return input
# change the name for each client
input_json = {'name':'Alice'}
#input_json = {'name':'Bob'}
#input_json = {'name':'Carol'}
base_url = 'http://localhost:3000/api/root_event_notification'
url = base_url + '?' + urllib.urlencode(input_json)
http = urllib3.PoolManager()
stream_response = http.request('GET', url, preload_content=False)
client = sseclient.SSEClient(stream_response)
# Loop forever (while connection "open")
for event in client.events():
print ("got a new event from server")
pprint.pprint(byteify(json.loads(event.data)))

现在,服务器代码应该:

  1. 发出服务器内部的"hello"事件,以便其他客户端侦听该事件
  2. "打开"通道
  3. 注册以侦听所有可能发生的内部服务器事件(这意味着,保持通道"打开"并且不会在消息之间发送任何内容,只是保持通道"打开")。
    • 这包括发出服务器内部的"再见"事件,以便其他客户端在客户端/网络关闭通道时侦听事件(最后"结束")。

使用以下节点.js API 代码:

var EventEmitter = require('events').EventEmitter;
var myEmitter = new EventEmitter;

function registerEventHandlers(req, res) {
// Save received parameters
const myParams = req.query;
// Define function that adds "Hi" and send a SSE formated message
const sayHi = function(params) {
params['says'] = "Hi";
let payloadString = JSON.stringify(params);
res.write(`data: ${payloadString}nn`);
}
// Define function that adds "Bye" and send a SSE formated message
const sayBye = function(params) {
params['says'] = "Bye";
let payloadString = JSON.stringify(params);
res.write(`data: ${payloadString}nn`);
}
// Register what to do when inside-server 'hello' event happens
myEmitter.on('hello', sayHi);
// Register what to do when inside-server 'goodbye' event happens
myEmitter.on('goodbye', sayBye);
// Register what to do when this channel closes
req.on('close', () => {
// Emit a server 'goodbye' event with "saved" params
myEmitter.emit('goodbye', myParams);
// Unregister this particular client listener functions
myEmitter.off('hello', sayHi);
myEmitter.off('goodbye', sayBye);
console.log("<- close ", req.query);
});
}

app.get("/api/root_event_notification", (req, res, next) => {
console.log("open -> ", req.query);
// Emit a inside-server 'hello' event with the received params
myEmitter.emit('hello', req.query);
// SSE Setup
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
});
res.write('n');
// Register what to do when possible inside-server events happen
registerEventHandlers(req, res);
// Code execution ends here but channel stays open
// Event handlers will use the open channel when inside-server events happen
})

。html5rocks.com 继续引用埃里克·比德尔曼(Eric Bidelman)的话:

从源发送事件流是构造纯文本响应的问题,该响应与遵循 SSE 格式的文本/事件流内容类型一起提供。在其基本形式中,响应应包含一个"data:"行,后跟您的消息,后跟两个""字符以结束流

在客户端代码中,sseclient-py 库负责解释 SSE 格式,因此每次两个""字符到达时,库都会"迭代"一个新的"可迭代"对象(一个新事件),该对象具有data属性,其中包含从服务器发送的消息。

这就是我测试代码的方式

  1. 使用 Node.js API 代码启动服务器
  2. 运行仅未注释"Alice"行的客户端(在此客户端控制台上尚未看到任何内容)。
  3. 运行第二个客户端,其中只有"Bob"行未注释。第一个客户端"Alice"的控制台显示:Bob 说"嗨"(在 Bob的客户端控制台上还没有看到任何内容)。
  4. 运行第三个客户端,其中只有"Carol"行未注释。爱丽丝和鲍勃的控制台显示:卡罗尔说"嗨">(卡罗尔的客户端控制台上还没有看到任何东西)。
  5. 停止/杀死鲍勃的客户。爱丽丝和卡罗尔的游戏机显示:鲍勃说"再见"。

因此,代码可以正常工作:)

最新更新