diff --git a/migrations/1729684505754_reward_set_signers.ts b/migrations/1729684505754_reward_set_signers.ts index 6ce13ad..7a27a35 100644 --- a/migrations/1729684505754_reward_set_signers.ts +++ b/migrations/1729684505754_reward_set_signers.ts @@ -39,4 +39,8 @@ export function up(pgm: MigrationBuilder): void { pgm.createIndex('reward_set_signers', ['signer_key']); pgm.createIndex('reward_set_signers', ['block_height']); pgm.createIndex('reward_set_signers', ['cycle_number']); + + pgm.createConstraint('reward_set_signers', 'reward_set_signers_cycle_unique', { + unique: ['signer_key', 'cycle_number'], + }); } diff --git a/src/index.ts b/src/index.ts index 385c305..3ce4c17 100644 --- a/src/index.ts +++ b/src/index.ts @@ -5,6 +5,7 @@ import { isProdEnv } from './helpers'; import { buildProfilerServer, logger, registerShutdownConfig } from '@hirosystems/api-toolkit'; import { closeChainhookServer, startChainhookServer } from './chainhook/server'; import { startPoxInfoUpdater } from './stacks-core-rpc/pox-info-updater'; +import { StackerSetUpdator } from './stacks-core-rpc/stacker-set-updater'; /** * Initializes background services. Only for `default` and `writeonly` run modes. @@ -22,6 +23,15 @@ async function initBackgroundServices(db: PgStore) { }, }); + const stackerSetUpdater = new StackerSetUpdator({ db }); + registerShutdownConfig({ + name: 'StackerSet fetcher', + forceKillable: false, + handler: async () => { + await stackerSetUpdater.stop(); + }, + }); + const server = await startChainhookServer({ db }); registerShutdownConfig({ name: 'Chainhook Server', diff --git a/src/pg/chainhook/chainhook-pg-store.ts b/src/pg/chainhook/chainhook-pg-store.ts index e31612f..2ba5f90 100644 --- a/src/pg/chainhook/chainhook-pg-store.ts +++ b/src/pg/chainhook/chainhook-pg-store.ts @@ -18,6 +18,7 @@ import { DbRewardSetSigner, } from '../types'; import { normalizeHexString, unixTimeMillisecondsToISO, unixTimeSecondsToISO } from '../../helpers'; +import { EventEmitter } from 'node:events'; const RejectReasonValidationFailed = 'VALIDATION_FAILED'; @@ -52,6 +53,8 @@ type MockBlockData = Extract< >['data']; export class ChainhookPgStore extends BasePgStoreModule { + readonly events = new EventEmitter<{ missingStackerSet: [{ cycleNumber: number }] }>(); + async processPayload(payload: StacksPayload): Promise { await this.sqlWriteTransaction(async sql => { for (const block of payload.rollback) { @@ -418,9 +421,38 @@ export class ChainhookPgStore extends BasePgStoreModule { } private async insertBlock(sql: PgSqlClient, dbBlock: DbBlock) { - await sql` - INSERT INTO blocks ${sql(dbBlock)} + // After the block is inserted, calculate the reward_cycle_number, then check if the reward_set_signers + // table contains any rows for the calculated cycle_number. + const result = await sql<{ cycle_number: number | null; reward_set_exists: boolean }[]>` + WITH inserted AS ( + INSERT INTO blocks ${sql(dbBlock)} + RETURNING burn_block_height + ), + cycle_number AS ( + SELECT FLOOR((inserted.burn_block_height - ct.first_burnchain_block_height) / ct.reward_cycle_length) AS cycle_number + FROM inserted, chain_tip AS ct + LIMIT 1 + ) + SELECT + cn.cycle_number, + EXISTS ( + SELECT 1 + FROM reward_set_signers + WHERE cycle_number = cn.cycle_number + LIMIT 1 + ) AS reward_set_exists + FROM cycle_number AS cn `; + const { cycle_number, reward_set_exists } = result[0]; + if (cycle_number === null) { + logger.warn(`Failed to calculate cycle number for block ${dbBlock.block_height}`); + } else if (cycle_number !== null && !reward_set_exists) { + logger.warn( + `Missing reward set signers for cycle ${cycle_number} in block ${dbBlock.block_height}` + ); + // Use setImmediate to ensure we break out of the current sql transaction within the async context + setImmediate(() => this.events.emit('missingStackerSet', { cycleNumber: cycle_number })); + } logger.info(`ChainhookPgStore apply block ${dbBlock.block_height} ${dbBlock.block_hash}`); } @@ -435,11 +467,17 @@ export class ChainhookPgStore extends BasePgStoreModule { } } - private async insertRewardSetSigners(sql: PgSqlClient, rewardSetSigners: DbRewardSetSigner[]) { + async insertRewardSetSigners(sql: PgSqlClient, rewardSetSigners: DbRewardSetSigner[]) { for await (const batch of batchIterate(rewardSetSigners, 500)) { - await sql` + const result = await sql` INSERT INTO reward_set_signers ${sql(batch)} + ON CONFLICT ON CONSTRAINT reward_set_signers_cycle_unique DO NOTHING `; + if (result.count === 0) { + logger.warn( + `Skipped inserting duplicate reward set signers for cycle ${rewardSetSigners[0].cycle_number}` + ); + } } } diff --git a/src/stacks-core-rpc/pox-info-updater.ts b/src/stacks-core-rpc/pox-info-updater.ts index 0945430..a383d88 100644 --- a/src/stacks-core-rpc/pox-info-updater.ts +++ b/src/stacks-core-rpc/pox-info-updater.ts @@ -1,7 +1,7 @@ import { logger } from '@hirosystems/api-toolkit'; import { PgStore } from '../pg/pg-store'; import { sleep } from '../helpers'; -import { ENV } from '../env'; +import { fetchRpcPoxInfo, getStacksNodeUrl } from './stacks-core-rpc-client'; // How long to wait between PoX rpc fetches when the database already has PoX info const POX_INFO_UPDATE_INTERVAL_MS = 30000; @@ -61,19 +61,3 @@ async function runPoxInfoBackgroundJob(db: PgStore, abortSignal: AbortSignal) { } } } - -interface PoxInfo { - first_burnchain_block_height: number; - reward_cycle_length: number; -} - -function getStacksNodeUrl(): string { - return `http://${ENV.STACKS_NODE_RPC_HOST}:${ENV.STACKS_NODE_RPC_PORT}`; -} - -async function fetchRpcPoxInfo(abortSignal: AbortSignal) { - const url = `${getStacksNodeUrl()}/v2/pox`; - const res = await fetch(url, { signal: abortSignal }); - const json = await res.json(); - return json as PoxInfo; -} diff --git a/src/stacks-core-rpc/stacker-set-updater.ts b/src/stacks-core-rpc/stacker-set-updater.ts new file mode 100644 index 0000000..ed9ac0b --- /dev/null +++ b/src/stacks-core-rpc/stacker-set-updater.ts @@ -0,0 +1,97 @@ +import { PgStore } from '../pg/pg-store'; +import PQueue from 'p-queue'; +import { fetchStackerSet, getStacksNodeUrl } from './stacks-core-rpc-client'; +import { sleep } from '../helpers'; +import { logger } from '@hirosystems/api-toolkit'; +import { DbRewardSetSigner } from '../pg/types'; + +// TODO: make this configurable +// How long to wait between retries when fetching fails +const FETCH_STACKER_SET_RETRY_INTERVAL_MS = 3000; + +// TODO: make this configurable +const FETCH_STACKER_SET_CONCURRENCY_LIMIT = 2; + +export class StackerSetUpdator { + private readonly queue: PQueue; + private readonly db: PgStore; + private readonly abortController: AbortController; + private readonly queuedCycleNumbers = new Set(); + + constructor(args: { db: PgStore }) { + this.db = args.db; + this.abortController = new AbortController(); + this.queue = new PQueue({ + concurrency: FETCH_STACKER_SET_CONCURRENCY_LIMIT, + autoStart: true, + }); + this.db.chainhook.events.on('missingStackerSet', ({ cycleNumber }) => { + this.add({ cycleNumber }); + }); + } + + async stop() { + this.abortController.abort(); + await this.queue.onIdle(); + this.queue.pause(); + } + + add({ cycleNumber }: { cycleNumber: number }): void { + if (this.queuedCycleNumbers.has(cycleNumber) || this.abortController.signal.aborted) { + return; + } + this.queuedCycleNumbers.add(cycleNumber); + void this.queue + .add(() => this.fetchStackerSet(cycleNumber)) + .catch(error => { + if (!this.abortController.signal.aborted) { + logger.error(error, `Unexpected stacker-set fetch queue error for cycle ${cycleNumber}`); + this.queuedCycleNumbers.delete(cycleNumber); + } + }); + } + + private async fetchStackerSet(cycleNumber: number) { + try { + logger.info(`Fetching stacker set for cycle ${cycleNumber} from stacks-core RPC ...`); + const stackerSet = await fetchStackerSet(cycleNumber, this.abortController.signal); + if (stackerSet.prePox4) { + logger.info(`Skipping stacker set update for cycle ${cycleNumber}, PoX-4 not yet active`); + this.queuedCycleNumbers.delete(cycleNumber); + return; // Exit job successful fetch + } + logger.info(`Fetched stacker set for cycle ${cycleNumber}, updating database ...`); + const dbRewardSetSigners = stackerSet.response.stacker_set.signers.map(entry => { + const rewardSetSigner: DbRewardSetSigner = { + cycle_number: cycleNumber, + block_height: 0, + burn_block_height: 0, + signer_key: Buffer.from(entry.signing_key.replace(/^0x/, ''), 'hex'), + signer_weight: entry.weight, + signer_stacked_amount: entry.stacked_amt.toString(), + }; + return rewardSetSigner; + }); + await this.db.chainhook.sqlWriteTransaction(async sql => { + await this.db.chainhook.insertRewardSetSigners(sql, dbRewardSetSigners); + }); + logger.info( + `Updated database with stacker set for cycle ${cycleNumber}, ${dbRewardSetSigners.length} signers` + ); + this.queuedCycleNumbers.delete(cycleNumber); + } catch (error) { + if (this.abortController.signal.aborted) { + return; // Updater service was stopped, ignore error and exit loop + } + logger.warn( + error, + `Failed to fetch stacker set for cycle ${cycleNumber}, retrying in ${FETCH_STACKER_SET_RETRY_INTERVAL_MS}ms ...` + ); + await sleep(FETCH_STACKER_SET_RETRY_INTERVAL_MS, this.abortController.signal); + setImmediate(() => { + this.queuedCycleNumbers.delete(cycleNumber); + this.add({ cycleNumber }); + }); + } + } +} diff --git a/src/stacks-core-rpc/stacks-core-rpc-client.ts b/src/stacks-core-rpc/stacks-core-rpc-client.ts new file mode 100644 index 0000000..ea03b28 --- /dev/null +++ b/src/stacks-core-rpc/stacks-core-rpc-client.ts @@ -0,0 +1,49 @@ +import { ENV } from '../env'; + +export function getStacksNodeUrl(): string { + return `http://${ENV.STACKS_NODE_RPC_HOST}:${ENV.STACKS_NODE_RPC_PORT}`; +} + +export interface PoxInfo { + first_burnchain_block_height: number; + reward_cycle_length: number; +} + +export async function fetchRpcPoxInfo(abortSignal: AbortSignal) { + const url = `${getStacksNodeUrl()}/v2/pox`; + const res = await fetch(url, { signal: abortSignal }); + const json = await res.json(); + return json as PoxInfo; +} + +export interface RpcStackerSetResponse { + stacker_set: { + rewarded_addresses: any[]; + start_cycle_state: { + missed_reward_slots: any[]; + }; + pox_ustx_threshold: number; + signers: { + signing_key: string; + stacked_amt: number; + weight: number; + }[]; + }; +} + +export async function fetchStackerSet( + cycleNumber: number, + abortSignal: AbortSignal +): Promise<{ prePox4: true } | { prePox4: false; response: RpcStackerSetResponse }> { + const url = `${getStacksNodeUrl()}/v3/stacker_set/${cycleNumber}`; + const res = await fetch(url, { signal: abortSignal }); + const json = await res.json(); + if (!res.ok) { + const err = JSON.stringify(json); + if (/Pre-PoX-4/i.test(err)) { + return { prePox4: true }; + } + throw new Error(`Failed to fetch stacker set for cycle ${cycleNumber}: ${err}`); + } + return { prePox4: false, response: json as RpcStackerSetResponse }; +}