Skip to content

Commit

Permalink
run all adapters without promise pool
Browse files Browse the repository at this point in the history
  • Loading branch information
vrtnd committed Dec 9, 2024
1 parent df193f4 commit 3becc51
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 40 deletions.
69 changes: 37 additions & 32 deletions src/server/jobs/runAdaptersFromTo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,58 +3,63 @@ import { runAdapterHistorical } from "../../utils/adapter";
import { sql } from "../../utils/db";
import { getBridgeID } from "../../utils/wrappa/postgres/query";
import dayjs from "dayjs";
import { PromisePool } from "@supercharge/promise-pool";

export const runAdaptersFromTo = async () => {
const fromTimestamp = dayjs().subtract(4, "hour").unix();
const toTimestamp = dayjs().unix();

await PromisePool.withConcurrency(5)
.for(bridgeNetworks)
.process(async (adapter) => {
const bridgeName = adapter.bridgeDbName;
await PromisePool.withConcurrency(5)
.for(adapter.chains)
.process(async (chain) => {
let nChain;
if (adapter.chainMapping && adapter.chainMapping[chain.toLowerCase()]) {
nChain = adapter.chainMapping[chain.toLowerCase()];
} else {
nChain = chain.toLowerCase();
}
if (nChain === adapter?.destinationChain?.toLowerCase()) return;
await Promise.all(
bridgeNetworks.map(async (adapter) => {
try {
const bridgeName = adapter.bridgeDbName;
await Promise.all(
adapter.chains.map(async (chain) => {
let nChain;
if (adapter.chainMapping && adapter.chainMapping[chain.toLowerCase()]) {
nChain = adapter.chainMapping[chain.toLowerCase()];
} else {
nChain = chain.toLowerCase();
}
if (nChain === adapter?.destinationChain?.toLowerCase()) return;

console.log(`Processing chain ${nChain} for ${bridgeName}`);
console.log(`Processing chain ${nChain} for ${bridgeName}`);

const bridgeConfig = await getBridgeID(bridgeName, nChain);
if (!bridgeConfig) {
console.error(`Could not find bridge config for ${nChain} on ${bridgeName}`);
return;
}
const fromTx = await sql<{ tx_block: number }[]>`
const bridgeConfig = await getBridgeID(bridgeName, nChain);
if (!bridgeConfig) {
console.error(`Could not find bridge config for ${nChain} on ${bridgeName}`);
return;
}
const fromTx = await sql<{ tx_block: number }[]>`
SELECT tx_block FROM bridges.transactions
WHERE bridge_id = ${bridgeConfig.id}
AND chain = ${nChain}
AND tx_block IS NOT NULL
AND ts <= to_timestamp(${fromTimestamp})
ORDER BY ts DESC LIMIT 1
`;
const fromBlock = fromTx[0].tx_block;
const toTx = await sql<{ tx_block: number }[]>`
const fromBlock = fromTx[0].tx_block;
const toTx = await sql<{ tx_block: number }[]>`
SELECT tx_block FROM bridges.transactions
WHERE bridge_id = ${bridgeConfig.id}
AND chain = ${nChain}
AND tx_block IS NOT NULL
AND ts >= to_timestamp(${toTimestamp})
ORDER BY ts ASC LIMIT 1
`;
const toBlock = toTx[0].tx_block;
const toBlock = toTx[0].tx_block;

if (!fromBlock || !toBlock) {
console.error(`Could not find transactions with blocks for ${nChain} on ${bridgeName}`);
return;
}
if (!fromBlock || !toBlock) {
console.error(`Could not find transactions with blocks for ${nChain} on ${bridgeName}`);
return;
}

await runAdapterHistorical(fromBlock, toBlock, adapter.id, nChain, true, false, "upsert");
});
});
await runAdapterHistorical(fromBlock, toBlock, adapter.id, nChain, true, false, "upsert");
})
);
} catch (e) {
console.error(`Failed to run adapter ${adapter.bridgeDbName}`);
console.error(e);
}
})
);
};
16 changes: 10 additions & 6 deletions src/server/jobs/runAllAdapters.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import bridgeNetworks from "../../data/bridgeNetworkData";
import { sql } from "../../utils/db";
import { runAdapterToCurrentBlock } from "../../utils/adapter";
import { PromisePool } from "@supercharge/promise-pool";

export const runAllAdapters = async () => {
const lastRecordedBlocks = await sql`SELECT jsonb_object_agg(bridge_id::text, subresult) as result
Expand All @@ -19,9 +18,14 @@ export const runAllAdapters = async () => {
console.error(e);
}

await PromisePool.withConcurrency(10)
.for(bridgeNetworks)
.process(async (bridge) => {
await runAdapterToCurrentBlock(bridge, true, "upsert", lastRecordedBlocks[0].result);
});
await Promise.all(
bridgeNetworks.map(async (adapter) => {
try {
await runAdapterToCurrentBlock(adapter, true, "upsert", lastRecordedBlocks[0].result);
} catch (e) {
console.error(`Failed to run adapter ${adapter.bridgeDbName}`);
console.error(e);
}
})
);
};
4 changes: 2 additions & 2 deletions src/utils/db.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ const connectionString =
const sql = postgres(connectionString, {
idle_timeout: 20,
max_lifetime: 60 * 30,
max: 5,
max: 7,
});

const querySql = postgres(connectionString, {
idle_timeout: 20,
max_lifetime: 60 * 30,
max: 5,
max: 4,
});

export { sql, querySql };

0 comments on commit 3becc51

Please sign in to comment.