我已经使用Kafkajs创建了Apache Kafka的客户端,我正在尝试从Kafka中的一个主题中读取消息。如果我console.log(消息(,它可以正常工作。但我想每次在主题中产生/写入新消息时,消费者都会从生产者那里听到消息,同时保持连接。
// function, which is being called whenever it's specified route is being requested
async readMessage(req, res, next, consumer) {
const resMessage = {};
res.writeHead(200, {'Content-Type': 'text/plain'});
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
res.write(message.value.toString());
},
});
// res.send(resMessage);
}
但在我将数据发送到express.js服务器后,res.write((不会将数据发送给客户端(我使用Postman作为Node.js客户端(。在调用res.end((之前,如何刷新res.write((中写入的数据?
res.write()
确实会在您调用它时立即发送数据(Express或nodejs没有缓冲(。我在调试器中对它进行了跟踪,并看到它毫无延迟地发送数据。它可能会被Nagle算法服务的操作系统短暂缓冲,但这只是一个很短的延迟(毫秒(。因此,数据被发送到客户端。更有可能的是,您的问题发生在接收数据的http客户端上。
大多数http客户端都是为预期请求/响应而构建的。发送一个请求,等待整个响应,然后通知调用者。因此,如果您想在http流到达时从中获取常规数据,那么您将需要一个非传统的http客户端,它会在数据到达时通知您。您可能还需要发明某种协议,让客户端知道完整的数据包何时到达,因为数据包可以以随机方式分块或分组。
可能比使用http更容易的是使用一个实际的基于消息的协议,如webSockets或socket.io,它是专门为你想要做的事情而设计的。客户端将与服务器建立连接,然后服务器可以随时向客户端发送消息。webSocket或socket.io是基于消息的,因此它们已经为您完成了所有的工作,包括描绘消息、将消息打包、传递、拆包以及通知收件人消息已经到达。这正是webSocket/socket.io的设计初衷。
对于单向消息发送(从服务器到客户端(,您还可以使用作为http扩展的服务器发送事件。