Node.js中有一个集群模块可以利用机器上所有可用的核心,这非常棒,尤其是在与节点模块pm2
一起使用时。但我对Deno的一些功能感到非常兴奋,但我一直想知道如何在多核机器上最好地运行它。
我知道有些工作人员在特定任务中工作得很好,但对于正常的web请求,多核机器的性能似乎有些浪费?在Deno获得硬件最大可用性和利用率的最佳策略是什么?
我有点担心,如果你只有一个进程在进行,并且由于任何原因都有一些CPU密集型任务,它会";块";所有其他请求都会进来。在node.js中,集群模块会解决这个问题,因为另一个进程会处理这个请求,但我不确定如何在Deno中处理这个问题?
我认为你可以在Deno的不同端口上运行几个实例,然后在它前面安装某种负载均衡器,但相比之下,这似乎是一个相当复杂的设置。我也知道你可以使用某种服务,比如Deno Deploy或其他什么,但我已经有了我想运行它的硬件
对我来说还有什么选择?提前感谢你明智的建议和更好的智慧。
在Deno中,就像在web浏览器中一样,您应该能够使用web Workers来100%利用多核CPU。
在集群中,您需要一个";经理;节点(根据需要/适当,它本身也可以是工作者)。以类似的方式,Web Worker API可以用于根据需要创建任意数量的专用工作者。这意味着主线程永远不应该阻塞,因为它可以将所有可能阻塞的任务委派给它的工作线程。不会阻塞的任务(例如,简单的数据库或其他I/O绑定调用)可以像往常一样直接在主线程上完成。
Deno还支持navigator.hardwareConcurrency
,因此您可以查询可用的硬件并相应地确定所需的工作人员数量。不过,您可能不需要定义任何限制。从与之前产生的专职工作者相同的来源产生一个新的专职工作者可能足够快,可以按需进行。即便如此,重用专门的工作人员可能还是有价值的,而不是为每个请求生成一个新的工作人员。
有了可传输对象,工人就可以在不复制数据的情况下使用大型数据集。这与消息传递一起,使委派任务变得非常简单,同时避免了复制大型数据集带来的性能瓶颈。
根据您的使用情况,您也可以使用类似Comlink"的库;这消除了思考postMessage
的心理障碍,并掩盖了你与员工一起工作的事实">
例如
main.ts
import { serve } from "https://deno.land/std@0.133.0/http/server.ts";
import ComlinkRequestHandler from "./ComlinkRequestHandler.ts";
serve(async function handler(request) {
const worker = new Worker(new URL("./worker.ts", import.meta.url).href, {
type: "module",
});
const handler = ComlinkRequestHandler.wrap(worker);
return await handler(request);
});
工人.ts
/// <reference no-default-lib="true"/>
/// <reference lib="deno.worker" />
import ComlinkRequestHandler from "./ComlinkRequestHandler.ts";
ComlinkRequestHandler.expose(async (request) => {
const body = await request.text();
return new Response(`Hello to ${request.url}nnReceived:nn${body}n`);
});
ComlinkRequestHandler.ts
import * as Comlink from "https://cdn.skypack.dev/comlink@4.3.1?dts";
interface RequestMessage extends Omit<RequestInit, "body" | "signal"> {
url: string;
headers: Record<string, string>;
hasBody: boolean;
}
interface ResponseMessage extends ResponseInit {
headers: Record<string, string>;
hasBody: boolean;
}
export default class ComlinkRequestHandler {
#handler: (request: Request) => Promise<Response>;
#responseBodyReader: ReadableStreamDefaultReader<Uint8Array> | undefined;
static expose(handler: (request: Request) => Promise<Response>) {
Comlink.expose(new ComlinkRequestHandler(handler));
}
static wrap(worker: Worker) {
const { handleRequest, nextResponseBodyChunk } =
Comlink.wrap<ComlinkRequestHandler>(worker);
return async (request: Request): Promise<Response> => {
const requestBodyReader = request.body?.getReader();
const requestMessage: RequestMessage = {
url: request.url,
hasBody: requestBodyReader !== undefined,
cache: request.cache,
credentials: request.credentials,
headers: Object.fromEntries(request.headers.entries()),
integrity: request.integrity,
keepalive: request.keepalive,
method: request.method,
mode: request.mode,
redirect: request.redirect,
referrer: request.referrer,
referrerPolicy: request.referrerPolicy,
};
const nextRequestBodyChunk = Comlink.proxy(async () => {
if (requestBodyReader === undefined) return undefined;
const { value } = await requestBodyReader.read();
return value;
});
const { hasBody: responseHasBody, ...responseInit } = await handleRequest(
requestMessage,
nextRequestBodyChunk
);
const responseBodyInit: BodyInit | null = responseHasBody
? new ReadableStream({
start(controller) {
async function push() {
const value = await nextResponseBodyChunk();
if (value === undefined) {
controller.close();
return;
}
controller.enqueue(value);
push();
}
push();
},
})
: null;
return new Response(responseBodyInit, responseInit);
};
}
constructor(handler: (request: Request) => Promise<Response>) {
this.#handler = handler;
}
async handleRequest(
{ url, hasBody, ...init }: RequestMessage,
nextRequestBodyChunk: () => Promise<Uint8Array | undefined>
): Promise<ResponseMessage> {
const request = new Request(
url,
hasBody
? {
...init,
body: new ReadableStream({
start(controller) {
async function push() {
const value = await nextRequestBodyChunk();
if (value === undefined) {
controller.close();
return;
}
controller.enqueue(value);
push();
}
push();
},
}),
}
: init
);
const response = await this.#handler(request);
this.#responseBodyReader = response.body?.getReader();
return {
hasBody: this.#responseBodyReader !== undefined,
headers: Object.fromEntries(response.headers.entries()),
status: response.status,
statusText: response.statusText,
};
}
async nextResponseBodyChunk(): Promise<Uint8Array | undefined> {
if (this.#responseBodyReader === undefined) return undefined;
const { value } = await this.#responseBodyReader.read();
return value;
}
}
示例用法:
% deno run --allow-net --allow-read main.ts
% curl -X POST --data '{"answer":42}' http://localhost:8000/foo/bar
Hello to http://localhost:8000/foo/bar
Received:
{"answer":42}
可能有更好的方法可以做到这一点(例如,通过Comlink.transferHandlers
和注册Request
、Response
和/或ReadableStream
的传输处理程序),但想法是相同的,并且在主体通过消息传输时,将处理甚至大的请求或响应有效载荷。
这一切都取决于您希望推送到线程的工作负载。如果你对在主线程上运行的内置Deno HTTP服务器的性能感到满意,但你需要利用多线程来更有效地创建响应,那么它就像Deno v1.29.4一样简单。
HTTP服务器将为您提供一个类似的异步迭代器server
import { serve } from "https://deno.land/std/http/server.ts";
const server = serve({ port: 8000 });
然后您可以使用类似的内置功能pooledMap
import { pooledMap } from "https://deno.land/std@0.173.0/async/pool.ts";
const ress = pooledMap( window.navigator.hardwareConcurrency - 1
, server
, req => new Promise(v => v(respondWith(req))
);
for await (const res of ress) {
// respond with res
}
其中respondWith
只是一个处理接收到的请求并生成响应对象的函数。如果respondWith
已经是一个异步函数,那么您甚至不需要将它包装成promise。
然而,如果你想在不同的therad上运行多个Deno HTTP服务器,那么这也是可能的,但你需要一个像GoBetween这样的负载均衡器。在这种情况下,您应该在单独的线程中实例化多个DenoHTTP服务器,并在主线程中作为单独的异步迭代器接收它们的请求集。为了实现这一点,每个线程你可以做喜欢;
在工人侧,即./servers/server_800X.ts
;
import { serve } from "https://deno.land/std/http/server.ts";
const server = serve({ port: 800X });
console.log("Listening on http://localhost:800X/");
for await (const req of server) {
postMessage({ type: "request", req });
}
在主线程中,您可以轻松地将相应的worker http服务器转换为类似的异步迭代器
async function* server_800X() {
worker_800X.onmessage = event => {
if (event.data.type === "request") {
yield event.data.req;
}
};
}
for await (const req of server_800X()) {
// Handle the request here in the main thread
}
您还应该能够通过在中使用MuxAsyncIterators功能将HTTP(req)或res异步迭代器复用到单个流中,然后由pooledMap
生成。因此,如果您有两个http服务器在server_8000.ts
和server_8001.ts
上工作,那么您可以将它们多路复用到一个类似的异步迭代器中
const muxedServer = new MuxAsyncIterator<Request>();
muxedServer.add(server_8000);
muxedServer.add(server_8001);
for await (const req of muxedServer) {
// repond accordingly(*)
}
显然,您还应该能够通过使用pooledMap
生成新的线程来处理从muxedServer
接收的请求,如上所示。
(*)如果您选择使用一个负载均衡器和多个Deno-http服务器,那么您应该在负载均衡器上为请求分配特殊的标头,指定它被转移到的服务器ID。这样,通过检查此特定标头,您可以决定从哪个服务器响应任何特定请求。