Skip to content

Commit

Permalink
use daily volume
Browse files Browse the repository at this point in the history
  • Loading branch information
vrtnd committed Dec 3, 2024
1 parent 319932b commit bcbcd23
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 45 deletions.
21 changes: 21 additions & 0 deletions sql/data.sql
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,24 @@ CREATE INDEX IF NOT EXISTS hourly_volume_chain ON bridges.hourly_volume (chain);

CREATE INDEX IF NOT EXISTS hourly_volume_bridge_ts_chain ON bridges.hourly_volume (bridge_id, ts, chain);
CREATE INDEX IF NOT EXISTS config_bridge_name_id ON bridges.config (bridge_name, id);

CREATE TABLE IF NOT EXISTS bridges.daily_volume (
id INT GENERATED ALWAYS AS IDENTITY,
bridge_id uuid NOT NULL,
ts TIMESTAMPTZ NOT NULL,
total_deposited_usd NUMERIC,
total_withdrawn_usd NUMERIC,
total_deposit_txs INTEGER,
total_withdrawal_txs INTEGER,
chain VARCHAR NOT NULL,
PRIMARY KEY(id),
UNIQUE (bridge_id, ts, chain),
CONSTRAINT fk_bridge_id
FOREIGN KEY(bridge_id)
REFERENCES bridges.config(id)
ON DELETE CASCADE
);

CREATE INDEX IF NOT EXISTS daily_volume_ts ON bridges.daily_volume (ts);
CREATE INDEX IF NOT EXISTS daily_volume_chain ON bridges.daily_volume (chain);
CREATE INDEX IF NOT EXISTS daily_volume_bridge_ts_chain ON bridges.daily_volume (bridge_id, ts, chain);
25 changes: 2 additions & 23 deletions src/handlers/getBridges.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,10 @@ const getBridges = async () => {
let weeklyVolume = 0;
let monthlyVolume = 0;
const startOfTheDayTs = getTimestampAtStartOfDay(getCurrentUnixTimestamp());
let chainDailyVolumes = {} as any;
const dailyStartTimestamp = startOfTheDayTs - 30 * secondsInDay;
const [lastMonthDailyVolume, last24hVolume] = await Promise.all([
getDailyBridgeVolume(dailyStartTimestamp, startOfTheDayTs, undefined, id),
getLast24HVolume(bridgeDbName),
Promise.all(
chains.map(async (chain) => {
const queryChain = normalizeChain(chain);
let chainLastDailyVolume;
const chainLastMonthDailyVolume = await getDailyBridgeVolume(
dailyStartTimestamp,
startOfTheDayTs,
queryChain,
id
);
let chainLastDailyTs = 0;
if (chainLastMonthDailyVolume?.length) {
const chainLastDailyVolumeRecord = chainLastMonthDailyVolume[chainLastMonthDailyVolume.length - 1];
chainLastDailyTs = parseInt(chainLastDailyVolumeRecord.date);
chainLastDailyVolume =
(chainLastDailyVolumeRecord.depositUSD + chainLastDailyVolumeRecord.withdrawUSD) / 2;
}
chainDailyVolumes[chain] = chainLastDailyVolume ?? 0;
})
),
]);

let lastDailyTs = 0;
Expand Down Expand Up @@ -71,15 +50,15 @@ const getBridges = async () => {
displayName: displayName,
// url: url,
icon: iconLink,
volumePrevDay: lastDailyVolume ?? 0,
volumePrevDay: last24hVolume ?? 0,
volumePrev2Day: dayBeforeLastVolume ?? 0,
lastHourlyVolume: lastHourlyVolume ?? 0,
last24hVolume: last24hVolume ?? 0,
lastDailyVolume: last24hVolume ?? 0,
dayBeforeLastVolume: dayBeforeLastVolume ?? 0,
weeklyVolume: weeklyVolume ?? 0,
monthlyVolume: monthlyVolume ?? 0,
chains: chains.sort((a, b) => chainDailyVolumes[b] - chainDailyVolumes[a]),
chains: chains,
destinationChain: destinationChain ?? "false",
url,
} as any;
Expand Down
7 changes: 6 additions & 1 deletion src/server/cron.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { runAggregateAllAdapters } from "./jobs/runAggregateAllAdapter";
import { runAdaptersFromTo } from "./jobs/runAdaptersFromTo";
import { handler as runWormhole } from "../handlers/runWormhole";
import { aggregateHourlyVolume } from "./jobs/aggregateHourlyVolume";
import { aggregateDailyVolume } from "./jobs/aggregateDailyVolume";

const createTimeout = (minutes: number) =>
new Promise((_, reject) =>
Expand Down Expand Up @@ -41,7 +42,11 @@ const cron = () => {
}).start();

new CronJob("20 * * * *", async () => {
await withTimeout(aggregateHourlyVolume(), 10);
await withTimeout(aggregateHourlyVolume(), 20);
}).start();

new CronJob("0 0 * * *", async () => {
await withTimeout(aggregateDailyVolume(), 20);
}).start();
};

Expand Down
43 changes: 43 additions & 0 deletions src/server/jobs/aggregateDailyVolume.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import { sql } from "../../utils/db";

async function aggregateDailyVolume() {
try {
await sql`
INSERT INTO bridges.daily_volume (
bridge_id,
ts,
total_deposited_usd,
total_withdrawn_usd,
total_deposit_txs,
total_withdrawal_txs,
chain
)
SELECT
ha.bridge_id,
date_trunc('day', ha.ts) as ts,
SUM(CAST(ha.total_deposited_usd AS NUMERIC)) as total_deposited_usd,
SUM(CAST(ha.total_withdrawn_usd AS NUMERIC)) as total_withdrawn_usd,
SUM(CAST(ha.total_deposit_txs AS INTEGER)) as total_deposit_txs,
SUM(CAST(ha.total_withdrawal_txs AS INTEGER)) as total_withdrawal_txs,
c.chain
FROM bridges.hourly_aggregated ha
JOIN bridges.config c ON ha.bridge_id = c.id
GROUP BY
ha.bridge_id,
date_trunc('day', ha.ts),
c.chain
ON CONFLICT (bridge_id, ts, chain) DO UPDATE SET
total_deposited_usd = EXCLUDED.total_deposited_usd,
total_withdrawn_usd = EXCLUDED.total_withdrawn_usd,
total_deposit_txs = EXCLUDED.total_deposit_txs,
total_withdrawal_txs = EXCLUDED.total_withdrawal_txs;
`;

console.log("Daily volume aggregation completed successfully");
} catch (error) {
console.error("Error during daily volume aggregation:", error);
throw error;
}
}

export { aggregateDailyVolume };
2 changes: 0 additions & 2 deletions src/utils/bridgeVolume.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,6 @@ export const getHourlyBridgeVolume = async (
({ bridgeDbName } = bridgeNetwork);
}

const chainIdsWithSingleEntry = (await getConfigsWithDestChain()).map((config) => config.id);

let sourceChainConfigs = [] as IConfig[];
if (chain) {
sourceChainConfigs = (await queryConfig(undefined, undefined, chain)).filter((config) => {
Expand Down
38 changes: 19 additions & 19 deletions src/utils/wrappa/postgres/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,11 @@ const queryAggregatedDailyTimestampRange = async (
chain?: string,
bridgeNetworkName?: string
) => {
let conditions = sql`WHERE date_trunc('day', hv.ts) >= to_timestamp(${startTimestamp})::date
AND date_trunc('day', hv.ts) <= to_timestamp(${endTimestamp})::date`;
let conditions = sql`WHERE date_trunc('day', dv.ts) >= to_timestamp(${startTimestamp})::date
AND date_trunc('day', dv.ts) <= to_timestamp(${endTimestamp})::date`;

if (chain) {
conditions = sql`${conditions} AND hv.chain = ${chain}`;
conditions = sql`${conditions} AND dv.chain = ${chain}`;
}

if (bridgeNetworkName) {
Expand All @@ -132,22 +132,22 @@ const queryAggregatedDailyTimestampRange = async (

const result = await sql<IAggregatedData[]>`
SELECT
hv.bridge_id,
date_trunc('day', hv.ts) as ts,
CAST(SUM(hv.total_deposited_usd) AS TEXT) as total_deposited_usd,
CAST(SUM(hv.total_withdrawn_usd) AS TEXT) as total_withdrawn_usd,
CAST(COALESCE(SUM(hv.total_deposit_txs), 0) AS INTEGER) as total_deposit_txs,
CAST(COALESCE(SUM(hv.total_withdrawal_txs), 0) AS INTEGER) as total_withdrawal_txs,
hv.chain
dv.bridge_id,
dv.ts,
CAST(SUM(dv.total_deposited_usd) AS TEXT) as total_deposited_usd,
CAST(SUM(dv.total_withdrawn_usd) AS TEXT) as total_withdrawn_usd,
CAST(COALESCE(SUM(dv.total_deposit_txs), 0) AS INTEGER) as total_deposit_txs,
CAST(COALESCE(SUM(dv.total_withdrawal_txs), 0) AS INTEGER) as total_withdrawal_txs,
dv.chain
FROM
bridges.hourly_volume hv
bridges.daily_volume dv
JOIN
bridges.config c ON hv.bridge_id = c.id
bridges.config c ON dv.bridge_id = c.id
${conditions}
GROUP BY
hv.bridge_id,
date_trunc('day', hv.ts),
hv.chain
dv.bridge_id,
dv.ts,
dv.chain
ORDER BY ts;
`;
return result;
Expand Down Expand Up @@ -340,7 +340,7 @@ const getLast24HVolume = async (bridgeName: string, volumeType: VolumeType = "bo

const result = await sql<{ total_volume: string }[]>`
SELECT COALESCE(SUM(${volumeColumn}), 0) as total_volume
FROM bridges.hourly_aggregated ha
FROM bridges.hourly_volume ha
JOIN bridges.config c ON ha.bridge_id = c.id
WHERE c.bridge_name = ${bridgeName}
AND ha.ts >= to_timestamp(${twentyFourHoursAgo})
Expand Down Expand Up @@ -371,10 +371,10 @@ const getNetflows = async (period: TimePeriod) => {
WHEN c.destination_chain IS NULL THEN (hv.total_withdrawn_usd - hv.total_deposited_usd)
ELSE (hv.total_deposited_usd - hv.total_withdrawn_usd)
END) as net_flow
FROM bridges.hourly_volume hv
FROM bridges.daily_volume hv
JOIN bridges.config c ON hv.bridge_id = c.id
WHERE hv.ts >= date_trunc(${period}, NOW() AT TIME ZONE 'UTC') - ${intervalPeriod}
AND hv.ts < date_trunc(${period}, NOW() AT TIME ZONE 'UTC')
WHERE hv.ts >= date_trunc('day', NOW() AT TIME ZONE 'UTC') - ${intervalPeriod}
AND hv.ts < date_trunc('day', NOW() AT TIME ZONE 'UTC')
AND LOWER(hv.chain) NOT LIKE '%dydx%'
GROUP BY hv.chain
)
Expand Down

0 comments on commit bcbcd23

Please sign in to comment.