在 Node.js 中发出 HTTP 请求并接收多部分/x 混合替换响应



我需要向外部服务器发出HTTP GET请求才能开始接收事件。请求后,我立即得到multipart/x-mixed-replace回复。当事件发生时,它将作为 XML 消息与指示此部分结束的边界一起发送。

现在我必须在 Node.js 中实现它。在正常请求下,我使用 node-rest-client ,调用其 get() 方法并将我的逻辑放在方法的回调中。麻烦的是,回调仅在响应完成时才执行,multipart/x-mixed-replace直到连接关闭时才执行。

有没有其他 NPM 模块可以解决问题?我搜索了NPM注册表,但我发现的结果似乎不适合该任务。还是在纯节点中这样做更好?因此,请举个例子。

这是我自己的实现:

const webStream = {
    get: function (url, callback) {
        let webClient;
        if (url.startsWith("http://")) {
            webClient = require("http");
        } else if (url.startsWith("https://")) {
            webClient = require("https");
        } else {
            throw "Unsupported protocol.";
        }
        let clientRequest = webClient.get(url, function (response) {
            let context = {
                url: url,
                boundary: "",
                contentType: "",
                contentLength: 0
            };
            let headersCompleted = false;
            let bodyCompleted = false;
            let buffer = null;
            let receivedBodyChunk = 0;
            response.on("data", function (chunk) {
                if (!headersCompleted) {
                    let headers = chunk.toString().split(/r?n/);
                    context.boundary = headers[0].substring(2);
                    context.contentType = headers[1].split(":")[1].trim();
                    context.contentLength = parseInt(headers[2].split(":")[1]);
                    buffer = Buffer.alloc(context.contentLength);
                    headersCompleted = true;
                } else {
                    if (!bodyCompleted) {
                        if (receivedBodyChunk < context.contentLength) {
                            chunk.copy(buffer, receivedBodyChunk, 0, chunk.byteLength);
                            receivedBodyChunk += chunk.byteLength;
                            if (receivedBodyChunk === context.contentLength) {
                                bodyCompleted = true;
                            }
                        }
                    }
                    if (bodyCompleted) {
                        callback(buffer, context);
                        headersCompleted = false;
                        bodyCompleted = false;
                        buffer = null;
                        receivedBodyChunk = 0;
                    }
                }
            });
        });
        return {
            url: url,
            handler: clientRequest,
            on: function (type, listener) {
                clientRequest.on(type, listener);
            },
            abort: function () {
                clientRequest.abort();
            }
        };
    }
};
let stream = webStream.get("http://127.0.0.1:8090/", function (data, context) {
    // data: Received content (Buffer)
    // context: { url, boundary, contentType, contentLength }
    // TODO: Do something here...
});
// stream.abort();
// stream.on("error", function(e) {
//     console.log("Error: " + e.message);
// });

最新更新