存储javascript承诺



我对Node还很陌生,决定将我的一个代码库转换为async/await。将回调转换为Async/await是一个相对容易的过程,但我遇到了将回调存储在队列中的问题。

this.on("request", (requestString, response) => {
// Check if there are no available workers
if (this.freeWorkers.length === 0) {
logger.info("Workers are busy please wait...");
this.queue.push([requestString, response]);
} else {
const worker = this.freeWorkers.pop();
/* Run the worker and send back the response */
worker.run(requestString, response, data => {
response(data); 
});
}
});

我遇到的问题是,如果不必要的话,我不想通过使用新的Promise(resolve,reject(来打破异步/等待。上面的版本运行得很好,因为当this.queue中有请求时,另一个事件"return"将完成存储的回调。异步版本如下:

async request (requestString) {
if (this.freeWorkers.length === 0) {
logger.info("Workers are busy please wait...");
this.queue.push([requestString, /* How do I store the promise here? */]);
} else {
logger.info("sending request...");
const worker = this.freeWorkers.pop();
/* Run the worker and send back the response */
return await worker.run(requestString);
}
}

问题是如何从async/await函数内部将promise存储在这个.requeue((中?

编辑:根据要求添加一些额外的细节,希望能澄清一些困惑。

"API",如果你可以叫它的话,它来自1982年的一个古老的IBM软件,我公司内部使用。this.on("request"是从我的API中的另一个类发出的。如果你好奇,这就是工人目前的样子:

/**
* Class Worker is a simple class with only a constructor and 1 run method
* sends a plain TCP request
* Uses a callback response and closes the connection
* */
class Worker {
/**
* Sets variables that it will use in the future
*
* @param {number} port
* @param {TracsRequest} eventEmitter
* */
constructor(port, eventEmitter) {
this.host = /* redacted */;
this.port = port;
this.emitter = eventEmitter;
this.lastRequest = null;
this.lastResponse = null;
}
/**
* Takes in a formatted request string and opens a TCP Port to parse the request
*
* @param requestString
* @param response
*/
async run(requestString) {
this.lastRequest = requestString;
this.lastResponse = response;
const socket = new net.Socket();
let tempBuffer = null;
const client = socket.connect(
this.port,
this.host,
() => {
client.write(requestString);
}
);

let socketPromise = new Promise((resolve, reject) => {
/**************************
* SOCKET EVENT LISTENERS *
**************************/
// store the incoming data
client.on("data", data => {
tempBuffer += data;
});
// client has finished respond with the data
client.on("end", () => {
logger.info("the client has finished");
this.emitter.emit("return", this);
client.destroy();
tempBuffer = tempBuffer.substring(4);
resolve(tempBuffer);
});
// Client has responded with an error send the worker back to the Request class
client.on("error", error => {
logger.error(`OHH snap he's dead Jim:${error}`);
reject(error);
});
});
return await socketPromise;
}

}`

您还没有向我们展示将工作人员重新添加到池和/或从工作队列中提取的代码,我将其称为replenish。无论发生什么,都需要调用一个函数(response(或完成一个promise(可能是通过调用从new Promise构造函数保存的resolve(。毕竟,即使您可以存储Promise,其想法也是,您不能在外部仅引用Promise来解决问题——您确实需要保留对resolve的引用才能更改Promise的状态。

我会在request中调用new Promise(),保存resolve函数,并将其添加到队列中。这意味着您的worker-replacer和队列检查器函数replenish不需要处理Promises,它只需要在完成工作时处理调用该函数。

在做了更多的挖掘之后,我能够想出一个可以工作的异步示例。这个例子确实使用了new Promise(),我认为这是实现我所要求的最好的方法,因为async/await没有可以存储到数组中的解析函数。

async request (requestString) {
let data = false;
try {
if (this.freeWorkers.length === 0) {
logger.info("Workers are busy please wait...");
let queuePromise = new Promise((resolve, reject) => {
this.queue.push([requestString, resolve, reject]);
});
/* await for the queue to run this request */
data = await queuePromise;
} else {
logger.info("sending request...");
const worker = this.freeWorkers.pop();
/* Run the worker and send back the response */
data = await worker.run(requestString);
}   
} catch (error) {
logger.error("Something went wrong in request");
throw error;
}
return data;
}

感谢这个问题的另一个答案引导我朝着正确的方向前进!我还用try/catch格式化了函数,这样就不会处理错误。

最新更新