我必须通过api2cart API从shopify提取大量数据。我需要拉2/3年的数据,这需要几个小时来拉,因为我们每个请求只能拉250个项目。对于每个响应,它提供下一个250个数据拉取的键,以此类推。
在我的后端,我通过fs提取数据并将其保存在csv文件中,然后我再次为接下来的250项调用api。它在我的本地机器上运行良好。该过程将继续进行,直到获取所有数据为止。我可以拉数年的数据,大约需要2-3小时才能拉出大约100k/150k的数据。
然后我建立了一个NestJS微服务,并部署在一个数字海洋服务器上。但是当我长时间发出API请求时,过了一会儿,服务器给我一个504网关超时错误。
不能使用setTimeout,因为这个进程没有限制。有没有办法在数小时或数天内持续提取数据?
如何在没有任何504网关超时错误的情况下拉数据数小时?
长时间运行的请求通常容易出错。连接重置可能导致重新启动整个进程。尽管这不会解决你所描述的数字海洋的潜在问题,但我认为你值得考虑一个不同的解决方案。我建议把繁重的、长时间运行的任务拆分成许多小任务,并使用队列系统。
Nestjs提供了一个非常好的关于队列和bull包的文档。
我添加了一个基本的例子,有两个解决方案:
队列使用者
shopify.consumer.ts
import { Processor, Process } from '@nestjs/bull';
import { Job } from 'bull';
@Processor('shopify')
export class ShopifyConsumer {
constructor(
private shopifyService: ShopifyService
) {}
@Process('fetch')
async transcode(job: Job<unknown>) {
await this.shopifyService.fetch(job.requestKey);
}
}
选项a)一次生成所有请求并让队列处理它们:
shopify.service.ts
import { Injectable } from '@nestjs/common';
import { Queue } from 'bull';
import { InjectQueue } from '@nestjs/bull';
@Injectable()
export class ShopifyService {
constructor(
@InjectQueue('shopify') private shopifyQueue: Queue
) {}
generateJobs(requestKeys: string[]) {
for (const requestKey of requestKeys) {
await this.shopifyQueue.add('fetch', {
requestKey
});
}
}
fetch(requestKey: string) {
// Fetch data
const res = await fetch('...')
}
}
选项b)在每个响应之后生成一个新的队列作业
shopify.service.ts
import { Injectable } from '@nestjs/common';
import { Queue } from 'bull';
import { InjectQueue } from '@nestjs/bull';
@Injectable()
export class ShopifyService {
constructor(
@InjectQueue('shopify') private shopifyQueue: Queue
) {}
fetch(requestKey: string) {
// Fetch data
const res = await fetch('...')
// Add next job to queue if more chunks are available
if (res.nextRequestKey) {
await this.shopifyQueue.add('fetch', {
requestKey: res.nextRequestKey
})
}
}
}
Queues和Bull在NestJS中工作得很好,我推荐它们用于这些长时间的调用。它真的很有用,因为你也可以重试调用,并为失败的作业添加超时(以帮助417错误)。
import { Injectable } from '@nestjs/common';
import { Queue } from 'bull';
import { InjectQueue } from '@nestjs/bull';
@Injectable()
export class ShopifyService {
constructor(
@InjectQueue('shopify') private shopifyQueue: Queue
) {}
async generateJobs(requestKeys: string[]) {
await this.shopifyQueue.addBulk('${QUEUE_NAME}.${JOB_NAME}', {
...jobData //what is needed to make API call
},
{
attempts: 5,
backoff: 5000,
//Will try 5 times with backoff
});
}
}
```