From 3becc51ffde25844d42070dbb294cad385cad986 Mon Sep 17 00:00:00 2001 From: vrtnd Date: Mon, 9 Dec 2024 21:15:28 +0300 Subject: [PATCH] run all adapters without promise pool --- src/server/jobs/runAdaptersFromTo.ts | 69 +++++++++++++++------------- src/server/jobs/runAllAdapters.ts | 16 ++++--- src/utils/db.js | 4 +- 3 files changed, 49 insertions(+), 40 deletions(-) diff --git a/src/server/jobs/runAdaptersFromTo.ts b/src/server/jobs/runAdaptersFromTo.ts index 7fd12d4a..472548ff 100644 --- a/src/server/jobs/runAdaptersFromTo.ts +++ b/src/server/jobs/runAdaptersFromTo.ts @@ -3,34 +3,33 @@ import { runAdapterHistorical } from "../../utils/adapter"; import { sql } from "../../utils/db"; import { getBridgeID } from "../../utils/wrappa/postgres/query"; import dayjs from "dayjs"; -import { PromisePool } from "@supercharge/promise-pool"; + export const runAdaptersFromTo = async () => { const fromTimestamp = dayjs().subtract(4, "hour").unix(); const toTimestamp = dayjs().unix(); - await PromisePool.withConcurrency(5) - .for(bridgeNetworks) - .process(async (adapter) => { - const bridgeName = adapter.bridgeDbName; - await PromisePool.withConcurrency(5) - .for(adapter.chains) - .process(async (chain) => { - let nChain; - if (adapter.chainMapping && adapter.chainMapping[chain.toLowerCase()]) { - nChain = adapter.chainMapping[chain.toLowerCase()]; - } else { - nChain = chain.toLowerCase(); - } - if (nChain === adapter?.destinationChain?.toLowerCase()) return; + await Promise.all( + bridgeNetworks.map(async (adapter) => { + try { + const bridgeName = adapter.bridgeDbName; + await Promise.all( + adapter.chains.map(async (chain) => { + let nChain; + if (adapter.chainMapping && adapter.chainMapping[chain.toLowerCase()]) { + nChain = adapter.chainMapping[chain.toLowerCase()]; + } else { + nChain = chain.toLowerCase(); + } + if (nChain === adapter?.destinationChain?.toLowerCase()) return; - console.log(`Processing chain ${nChain} for ${bridgeName}`); + console.log(`Processing chain ${nChain} for ${bridgeName}`); - const bridgeConfig = await getBridgeID(bridgeName, nChain); - if (!bridgeConfig) { - console.error(`Could not find bridge config for ${nChain} on ${bridgeName}`); - return; - } - const fromTx = await sql<{ tx_block: number }[]>` + const bridgeConfig = await getBridgeID(bridgeName, nChain); + if (!bridgeConfig) { + console.error(`Could not find bridge config for ${nChain} on ${bridgeName}`); + return; + } + const fromTx = await sql<{ tx_block: number }[]>` SELECT tx_block FROM bridges.transactions WHERE bridge_id = ${bridgeConfig.id} AND chain = ${nChain} @@ -38,8 +37,8 @@ export const runAdaptersFromTo = async () => { AND ts <= to_timestamp(${fromTimestamp}) ORDER BY ts DESC LIMIT 1 `; - const fromBlock = fromTx[0].tx_block; - const toTx = await sql<{ tx_block: number }[]>` + const fromBlock = fromTx[0].tx_block; + const toTx = await sql<{ tx_block: number }[]>` SELECT tx_block FROM bridges.transactions WHERE bridge_id = ${bridgeConfig.id} AND chain = ${nChain} @@ -47,14 +46,20 @@ export const runAdaptersFromTo = async () => { AND ts >= to_timestamp(${toTimestamp}) ORDER BY ts ASC LIMIT 1 `; - const toBlock = toTx[0].tx_block; + const toBlock = toTx[0].tx_block; - if (!fromBlock || !toBlock) { - console.error(`Could not find transactions with blocks for ${nChain} on ${bridgeName}`); - return; - } + if (!fromBlock || !toBlock) { + console.error(`Could not find transactions with blocks for ${nChain} on ${bridgeName}`); + return; + } - await runAdapterHistorical(fromBlock, toBlock, adapter.id, nChain, true, false, "upsert"); - }); - }); + await runAdapterHistorical(fromBlock, toBlock, adapter.id, nChain, true, false, "upsert"); + }) + ); + } catch (e) { + console.error(`Failed to run adapter ${adapter.bridgeDbName}`); + console.error(e); + } + }) + ); }; diff --git a/src/server/jobs/runAllAdapters.ts b/src/server/jobs/runAllAdapters.ts index fc8a1851..1ba1767a 100644 --- a/src/server/jobs/runAllAdapters.ts +++ b/src/server/jobs/runAllAdapters.ts @@ -1,7 +1,6 @@ import bridgeNetworks from "../../data/bridgeNetworkData"; import { sql } from "../../utils/db"; import { runAdapterToCurrentBlock } from "../../utils/adapter"; -import { PromisePool } from "@supercharge/promise-pool"; export const runAllAdapters = async () => { const lastRecordedBlocks = await sql`SELECT jsonb_object_agg(bridge_id::text, subresult) as result @@ -19,9 +18,14 @@ export const runAllAdapters = async () => { console.error(e); } - await PromisePool.withConcurrency(10) - .for(bridgeNetworks) - .process(async (bridge) => { - await runAdapterToCurrentBlock(bridge, true, "upsert", lastRecordedBlocks[0].result); - }); + await Promise.all( + bridgeNetworks.map(async (adapter) => { + try { + await runAdapterToCurrentBlock(adapter, true, "upsert", lastRecordedBlocks[0].result); + } catch (e) { + console.error(`Failed to run adapter ${adapter.bridgeDbName}`); + console.error(e); + } + }) + ); }; diff --git a/src/utils/db.js b/src/utils/db.js index 8a6454ea..dcb895b5 100644 --- a/src/utils/db.js +++ b/src/utils/db.js @@ -10,13 +10,13 @@ const connectionString = const sql = postgres(connectionString, { idle_timeout: 20, max_lifetime: 60 * 30, - max: 5, + max: 7, }); const querySql = postgres(connectionString, { idle_timeout: 20, max_lifetime: 60 * 30, - max: 5, + max: 4, }); export { sql, querySql };