将函数添加到 p 队列以处理并发停止队列



我正在使用p-queue和Puppeteer。目标是运行 X 数量的 Chrome 实例,其中 p 队列限制并发量。当队列中的任务中发生异常时,我想将其重新排队。但是当我这样做时,队列会停止。

我有以下几点:

getAccounts 它只是一个解析 JSON 文件的帮助程序方法。对于每个条目,我都会创建一个任务并将其提交到队列中。

async init() {
let accounts = await this.getAccounts();
accounts.map(async () => {
await queue.add(() => this.test());
});
await queue.onIdle();
console.log("ended, with count: " + this._count)
}

测试方法:

async test() {
this._count++;
const browser = await puppeteer.launch({headless: false});
try {
const page = await browser.newPage();
await page.goto(this._url);
if (Math.floor(Math.random() * 10) > 4) {
throw new Error("Simulate error");
}
await browser.close();
} catch (error) {
await browser.close();
await queue.add(() => this.test());
console.log(error);
}
}

如果我在没有await queue.add(() => this.test());的情况下运行它,它运行良好并将并发限制为 3。但是有了它,每当它进入捕获时,当前的Chrome实例都会停止。

它也不会记录错误,这也不会console.log("ended, with count: " + this._count).

这是节点模块的错误,还是我做错了什么?

我建议检查 Apify SDK 包,您可以在其中简单地使用其中一个辅助类来管理木偶页面/浏览器。

木偶池: 它为您管理浏览器实例。如果为每个浏览器设置一页。每个新页面都将创建一个新的浏览器实例。

const puppeteerPool = new PuppeteerPool({
maxOpenPagesPerInstance: 1,
});
const page1 = await puppeteerPool.newPage();
const page2 = await puppeteerPool.newPage();
const page3 = await puppeteerPool.newPage();
// ... do something with the pages ...
// Close all browsers.
await puppeteerPool.destroy();

或者PuppeteerCrawler更强大,有几个选项和助手。您可以在那里的木偶器中管理整个爬虫。您可以查看PuppeteerCrawler示例。

编辑: 使用 PuppeteerCrawler 10 并发的示例

const Apify = require('apify');
Apify.main(async () => {
// Apify.openRequestQueue() is a factory to get a preconfigured RequestQueue instance.
// We add our first request to it - the initial page the crawler will visit.
const requestQueue = await Apify.openRequestQueue();
await requestQueue.addRequest({ url: 'https://news.ycombinator.com/' }); // Adds URLs you want to process
// Create an instance of the PuppeteerCrawler class - a crawler
// that automatically loads the URLs in headless Chrome / Puppeteer.
const crawler = new Apify.PuppeteerCrawler({
requestQueue,
maxConcurrency: 10, // Set max concurrency
puppeteerPoolOptions: {
maxOpenPagesPerInstance: 1, // Set up just one page for one browser instance
},
// The function accepts a single parameter, which is an object with the following fields:
// - request: an instance of the Request class with information such as URL and HTTP method
// - page: Puppeteer's Page object (see https://pptr.dev/#show=api-class-page)
handlePageFunction: async ({ request, page }) => {
// Code you want to process on each page
},
// This function is called if the page processing failed more than maxRequestRetries+1 times.
handleFailedRequestFunction: async ({ request }) => {
// Code you want to process when handlePageFunction failed
},
});
// Run the crawler and wait for it to finish.
await crawler.run();
console.log('Crawler finished.');
});

使用请求列表的示例:

const Apify = require('apify');
Apify.main(async () => {
const requestList = new Apify.RequestList({
sources: [
// Separate requests
{ url: 'http://www.example.com/page-1' },
{ url: 'http://www.example.com/page-2' },
// Bulk load of URLs from file `http://www.example.com/my-url-list.txt`
{ requestsFromUrl: 'http://www.example.com/my-url-list.txt', userData: { isFromUrl: true } },
],
persistStateKey: 'my-state',
persistSourcesKey: 'my-sources',
});
// This call loads and parses the URLs from the remote file.
await requestList.initialize();
const crawler = new Apify.PuppeteerCrawler({
requestList,
maxConcurrency: 10, // Set max concurrency
puppeteerPoolOptions: {
maxOpenPagesPerInstance: 1, // Set up just one page for one browser instance
},
// The function accepts a single parameter, which is an object with the following fields:
// - request: an instance of the Request class with information such as URL and HTTP method
// - page: Puppeteer's Page object (see https://pptr.dev/#show=api-class-page)
handlePageFunction: async ({ request, page }) => {
// Code you want to process on each page
},
// This function is called if the page processing failed more than maxRequestRetries+1 times.
handleFailedRequestFunction: async ({ request }) => {
// Code you want to process when handlePageFunction failed
},
});
// Run the crawler and wait for it to finish.
await crawler.run();
console.log('Crawler finished.');
});

最新更新