node js函数中的并行处理



我有一个函数,它有超过1400+个加密货币对,我必须针对每个对发送一个API并存储交易。现在每对需要3-4秒,因此整个功能需要花费大量时间。我正在从我的数据库中获取配对,我也在数据库中存储交易数据。我需要并行处理对,这样一开始对的交易就不会因为函数没有处理而错过。

这是我目前的功能:

const getTrades = async () => {
let page = 1;
const results = await db.query("SELECT * FROM pairs;");
const pairs = results.rows;
const latest = await db.query("SELECT MAX(trade_time) FROM trades");
const latestTrade = latest.rows[0].max;
const coinResult = await db.query("SELECT * FROM coins");
let coinsInfo = coinResult.rows;
coinsInfo = coinsInfo.flat();
for (const pair of pairs) {
let biggestTrade = [];
const response = await axios.get(
`https://api.binance.com/api/v3/trades?symbol=${pair.pair}`
);
let filtered = response.data;
filtered = filtered.filter((trade) => trade.time > latestTrade);
let sells = filtered.filter((trade) => trade.isBuyerMaker === true);
let buys = filtered.filter((trade) => trade.isBuyerMaker === false);
if (sells.length > 0) {
biggestTrade.push(
sells.reduce(function (prev, current) {
return prev.quoteQty > current.quoteQty ? prev : current;
})
);
}
if (buys.length > 0) {
biggestTrade.push(
buys.reduce(function (prev, current) {
return prev.quoteQty > current.quoteQty ? prev : current;
})
);
}
biggestTrade = biggestTrade.flat();
for (const trade of filtered) {
let priceUSD = 0;
let baseAssetIcon = "null";
for (const coin of coinsInfo) {
if (coin.symbol.toUpperCase() === pair.quote_asset) {
priceUSD = coin.current_price;
}
if (coin.symbol.toUpperCase() === pair.base_asset) {
baseAssetIcon = coin.image_url;
}
if (priceUSD > 0 && baseAssetIcon != "null") {
break;
}
}
if (trade.quoteQty * priceUSD > 50000) {
const results = db.query(
"INSERT INTO trades (exchange_name, exchange_icon_url, trade_time, price_in_quote_asset,price_in_usd, trade_value, base_asset_icon,  qty, quoteQty, is_buyer_maker, pair, base_asset_trade, quote_asset_trade) VALUES($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12, $13)",
[
"Binance",
"https://assets.coingecko.com/markets/images/52/small/binance.jpg?1519353250",
trade.time,
trade.price,
priceUSD,
trade.quoteQty * priceUSD,
baseAssetIcon,
trade.qty,
trade.quoteQty,
trade.isBuyerMaker,
pair.pair,
pair.base_asset,
pair.quote_asset,
]
);
console.log("TRADE ADDED");
}
}
}
console.log("PAIRS ARE OVER");
};
pairs has over 1400 entries and this is the one where are looping through. 

取决于运行此功能的服务器数量。

如果是一台机器,使用worker_threads,基本上在单独的线程中运行相同的函数来实现并行化,但老实说,1400对是很多的,每对3-4秒,所以如果是串行的,每次运行总共大约1-2小时。根据您的机器,如果您有8个核心,它可能会将时间减少8倍,但仍会让您大约停留10分钟。对于拥有更多cpu核心的实例,云服务通常会收取更高的费用。

如果是多台机器,使用一个master和一个队列将新的线程对推送到每台工作机器,对于每台工作计算机,您还可以为每台机器生成多个线程,这样您就可以水平扩展,并且可以在几秒钟内完成运行。在这种情况下,每台机器都可以从云提供商那里买到便宜的。

所以取决于你的要求,如果你想超快速,你必须增加更多的机器。

最新更新