为什么 python 客户端没有收到 SSE 事件

我有一个python客户端从带有node的服务器监听SSE事件.js API

流程是我通过call_notification.py向节点.js API 发送事件,并使用run.sh循环运行seevents.py(见下文)

但是我没有看到 python 客户端正在接收此 SSE 事件? 关于为什么会这样的任何指导?


import requests
input_json = {'BATS':'678910','root_version':'12A12'}
url = 'http://company.com/api/root_event_notification?params=%s'%input_json
response = requests.get(url)
print response.text

节点.js API

app.get("/api/root_event_notification", (req, res, next) => {
var events = require('events');
var eventEmitter = new events.EventEmitter();
//Create an event handler:
var myEventHandler = function () {
message: "New root build released!",
posts: req.query.params

seevents.py(侦听 SSE 事件的 python 客户端)

import json
import pprint
import sseclient
def with_urllib3(url):
"""Get a streaming response for the given event feed using urllib3."""
import urllib3
http = urllib3.PoolManager()
return http.request('GET', url, preload_content=False)
def with_requests(url):
"""Get a streaming response for the given event feed using requests."""
import requests
return requests.get(url, stream=True)
url = 'http://company.com/api/root_event_notification'
response = with_urllib3(url)  # or with_requests(url)
client = sseclient.SSEClient(response)
#print client.events()
for event in client.events():
print "inside"


while [ /usr/bin/true ]
echo "Running sseevents.py"
python sseevents.py  2>&1 | tee -a sseevents.log.txt
echo "sleeping for 30 sec"
sleep 30


Run call_notification.py on Terminal
node.js API OUTPUT
{'root_version': 'ABCD', 'BATS': '143'}
./run.sh --> DON'T SEE ABOVE EVENT below
Running sseevents.py
sleeping for 30 sec
Running sseevents.py
sleeping for 30 sec
Running sseevents.py
sleeping for 30 sec


服务器代码未将 SSE 消息发送回客户端。

为什么?因为您需要遵循 SSE 格式。

根据Jason BUTZ在服务器发送的事件与节点


完成所有这些操作后,应将换行符 () 发送到客户端,然后可以发送事件。 事件必须作为字符串发送,但该字符串中的内容无关紧要。JSON 字符串非常好。

事件数据必须以"data: <DATA TO SEND HERE>n"格式发送。




根据埃里克·比德尔曼(Eric Bidelman)在 html5rocks.com:

使用 SSE 进行通信时,服务器可以随时将数据推送到您的应用程序,而无需发出初始请求。换句话说,更新可以在发生时从服务器流式传输到客户端。


  • "启动"是通过调用 SSE API 端点(在您的情况下,调用 Node.js API 代码)来完成的。
  • 准备
  • 是通过准备处理异步消息流来完成的。

SSE 在服务器和客户端之间打开单个单向通道

* 重点是我的




  1. 客户端 Alice 使用参数调用 API 端点{name: "Alice"},没有任何(可见)发生。

  2. 。然后客户端 Bob 调用带有参数的 API 端点{name: "Bob"},客户端 Alice 收到一个有效负载{name: "Bob", says: "Hi"}的 SSE。

  3. 。然后客户端 Carol 调用带有参数的 API 端点{name: "Carol"},客户端 Alice 和 Bob 各自收到一个有效负载{name: "Carol", says: "Hi"}的 SSE。

  4. 。等等。每次新客户端使用 params 调用 API 端点时,通道"打开"的所有其他客户端都将收到具有新"Hi"有效负载的 SSE。

  5. 。然后客户端 Bob 与服务器"断开连接",客户端 Alice、客户端 Carol 和所有具有"打开"通道的客户端将收到有效负载{name: "Bob", says: "Bye"}的 SSE。

  6. 。等等。每次旧客户端与服务器"断开连接"时,具有"打开"通道的所有其他客户端都将收到具有新"再见"有效负载的 SSE。


  • 每个要求"打开"发送一些参数的通道或旧客户端与服务器"断开连接"的新客户端,它们会导致服务器中的事件。
  • 每次服务器中发生此类事件时,服务器都会向所有"开放"通道发送带有参数的 SSE 消息和作为有效负载的消息。

关于阻塞的说明 每个具有"开放"通道的客户端都将"卡住"在无限等待事件发生的循环中。客户端设计有责任使用"线程"代码技术来避免阻塞。


您的 Python 客户端应该"要求"启动单个单向通道并继续等待,直到通道关闭。不应该结束并使用不同的频道重新开始。它应该保持相同的通道打开。

从网络的角度来看,它就像一个不会结束的"长"响应(直到 SSE 消息传递结束)。回应只是"不断来来去"。

你的 Python 客户端代码就是这样做的。我注意到它是 sseclient-py 库中使用的确切示例代码。

Python 3.4 的客户端代码

若要包含要发送到服务器的参数,请使用Requests库 docs/#passing-parameters-in-url 中的一些代码。

因此,混合这些示例,我们最终会得到以下代码作为您的 Python 3.4 客户端:

import json
import pprint
import requests
import sseclient # sseclient-py
# change the name for each client
input_json = {'name':'Alice'}
#input_json = {'name':'Bob'}
#input_json = {'name':'Carol'}
url = 'http://company.com/api/root_event_notification'
stream_response = requests.get(url, params=input_json, stream=True)
client = sseclient.SSEClient(stream_response)
# Loop forever (while connection "open")
for event in client.events():
print ("got a new event from server")

Python 2.7 的客户端代码

若要包含要发送到服务器的参数,请使用库将它们在 URL 中编码为查询参数urllib.urlencode()

使用urllib3.PoolManager().request()发出 http 请求,这样您最终会得到流响应。

请注意,sseclient库以 unicode 字符串的形式返回事件数据。要将 JSON 对象转换回 python 对象(带有 python 字符串),请使用byteify,一个递归自定义函数(感谢 Mark Amery )。

使用以下代码作为 Python 2.7 客户端:

import json
import pprint
import urllib
import urllib3
import sseclient # sseclient-py
# Function that returns byte strings instead of unicode strings
# Thanks to:
# [Mark Amery](https://stackoverflow.com/users/1709587/mark-amery)
def byteify(input):
if isinstance(input, dict):
return {byteify(key): byteify(value)
for key, value in input.iteritems()}
elif isinstance(input, list):
return [byteify(element) for element in input]
elif isinstance(input, unicode):
return input.encode('utf-8')
return input
# change the name for each client
input_json = {'name':'Alice'}
#input_json = {'name':'Bob'}
#input_json = {'name':'Carol'}
base_url = 'http://localhost:3000/api/root_event_notification'
url = base_url + '?' + urllib.urlencode(input_json)
http = urllib3.PoolManager()
stream_response = http.request('GET', url, preload_content=False)
client = sseclient.SSEClient(stream_response)
# Loop forever (while connection "open")
for event in client.events():
print ("got a new event from server")


  1. 发出服务器内部的"hello"事件,以便其他客户端侦听该事件
  2. "打开"通道
  3. 注册以侦听所有可能发生的内部服务器事件(这意味着,保持通道"打开"并且不会在消息之间发送任何内容,只是保持通道"打开")。
    • 这包括发出服务器内部的"再见"事件,以便其他客户端在客户端/网络关闭通道时侦听事件(最后"结束")。

使用以下节点.js API 代码:

var EventEmitter = require('events').EventEmitter;
var myEmitter = new EventEmitter;

function registerEventHandlers(req, res) {
// Save received parameters
const myParams = req.query;
// Define function that adds "Hi" and send a SSE formated message
const sayHi = function(params) {
params['says'] = "Hi";
let payloadString = JSON.stringify(params);
res.write(`data: ${payloadString}nn`);
// Define function that adds "Bye" and send a SSE formated message
const sayBye = function(params) {
params['says'] = "Bye";
let payloadString = JSON.stringify(params);
res.write(`data: ${payloadString}nn`);
// Register what to do when inside-server 'hello' event happens
myEmitter.on('hello', sayHi);
// Register what to do when inside-server 'goodbye' event happens
myEmitter.on('goodbye', sayBye);
// Register what to do when this channel closes
req.on('close', () => {
// Emit a server 'goodbye' event with "saved" params
myEmitter.emit('goodbye', myParams);
// Unregister this particular client listener functions
myEmitter.off('hello', sayHi);
myEmitter.off('goodbye', sayBye);
console.log("<- close ", req.query);

app.get("/api/root_event_notification", (req, res, next) => {
console.log("open -> ", req.query);
// Emit a inside-server 'hello' event with the received params
myEmitter.emit('hello', req.query);
// SSE Setup
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
// Register what to do when possible inside-server events happen
registerEventHandlers(req, res);
// Code execution ends here but channel stays open
// Event handlers will use the open channel when inside-server events happen

。html5rocks.com 继续引用埃里克·比德尔曼(Eric Bidelman)的话:

从源发送事件流是构造纯文本响应的问题,该响应与遵循 SSE 格式的文本/事件流内容类型一起提供。在其基本形式中,响应应包含一个"data:"行,后跟您的消息,后跟两个""字符以结束流

在客户端代码中,sseclient-py 库负责解释 SSE 格式,因此每次两个""字符到达时,库都会"迭代"一个新的"可迭代"对象(一个新事件),该对象具有data属性,其中包含从服务器发送的消息。


  1. 使用 Node.js API 代码启动服务器
  2. 运行仅未注释"Alice"行的客户端(在此客户端控制台上尚未看到任何内容)。
  3. 运行第二个客户端,其中只有"Bob"行未注释。第一个客户端"Alice"的控制台显示:Bob 说"嗨"(在 Bob的客户端控制台上还没有看到任何内容)。
  4. 运行第三个客户端,其中只有"Carol"行未注释。爱丽丝和鲍勃的控制台显示:卡罗尔说"嗨">(卡罗尔的客户端控制台上还没有看到任何东西)。
  5. 停止/杀死鲍勃的客户。爱丽丝和卡罗尔的游戏机显示:鲍勃说"再见"。

