From 76877a5729ab0cbfee41860abe7750f5fce7d1ec Mon Sep 17 00:00:00 2001 From: Matthew Little Date: Mon, 28 Oct 2024 14:28:33 +0100 Subject: [PATCH 1/3] feat: stacker-set updater background service --- .../1729684505754_reward_set_signers.ts | 4 + src/index.ts | 10 +++ src/pg/chainhook/chainhook-pg-store.ts | 10 ++- src/stacks-core-rpc/pox-info-updater.ts | 18 +--- src/stacks-core-rpc/stacker-set-updater.ts | 84 +++++++++++++++++++ src/stacks-core-rpc/stacks-core-rpc-client.ts | 39 +++++++++ 6 files changed, 146 insertions(+), 19 deletions(-) create mode 100644 src/stacks-core-rpc/stacker-set-updater.ts create mode 100644 src/stacks-core-rpc/stacks-core-rpc-client.ts diff --git a/migrations/1729684505754_reward_set_signers.ts b/migrations/1729684505754_reward_set_signers.ts index 6ce13ad..7a27a35 100644 --- a/migrations/1729684505754_reward_set_signers.ts +++ b/migrations/1729684505754_reward_set_signers.ts @@ -39,4 +39,8 @@ export function up(pgm: MigrationBuilder): void { pgm.createIndex('reward_set_signers', ['signer_key']); pgm.createIndex('reward_set_signers', ['block_height']); pgm.createIndex('reward_set_signers', ['cycle_number']); + + pgm.createConstraint('reward_set_signers', 'reward_set_signers_cycle_unique', { + unique: ['signer_key', 'cycle_number'], + }); } diff --git a/src/index.ts b/src/index.ts index 385c305..3ce4c17 100644 --- a/src/index.ts +++ b/src/index.ts @@ -5,6 +5,7 @@ import { isProdEnv } from './helpers'; import { buildProfilerServer, logger, registerShutdownConfig } from '@hirosystems/api-toolkit'; import { closeChainhookServer, startChainhookServer } from './chainhook/server'; import { startPoxInfoUpdater } from './stacks-core-rpc/pox-info-updater'; +import { StackerSetUpdator } from './stacks-core-rpc/stacker-set-updater'; /** * Initializes background services. Only for `default` and `writeonly` run modes. @@ -22,6 +23,15 @@ async function initBackgroundServices(db: PgStore) { }, }); + const stackerSetUpdater = new StackerSetUpdator({ db }); + registerShutdownConfig({ + name: 'StackerSet fetcher', + forceKillable: false, + handler: async () => { + await stackerSetUpdater.stop(); + }, + }); + const server = await startChainhookServer({ db }); registerShutdownConfig({ name: 'Chainhook Server', diff --git a/src/pg/chainhook/chainhook-pg-store.ts b/src/pg/chainhook/chainhook-pg-store.ts index e31612f..8ef0586 100644 --- a/src/pg/chainhook/chainhook-pg-store.ts +++ b/src/pg/chainhook/chainhook-pg-store.ts @@ -435,11 +435,17 @@ export class ChainhookPgStore extends BasePgStoreModule { } } - private async insertRewardSetSigners(sql: PgSqlClient, rewardSetSigners: DbRewardSetSigner[]) { + async insertRewardSetSigners(sql: PgSqlClient, rewardSetSigners: DbRewardSetSigner[]) { for await (const batch of batchIterate(rewardSetSigners, 500)) { - await sql` + const result = await sql` INSERT INTO reward_set_signers ${sql(batch)} + ON CONFLICT ON CONSTRAINT reward_set_signers_cycle_unique DO NOTHING `; + if (result.count === 0) { + logger.warn( + `Skipped inserting duplicate reward set signers for cycle ${rewardSetSigners[0].cycle_number}` + ); + } } } diff --git a/src/stacks-core-rpc/pox-info-updater.ts b/src/stacks-core-rpc/pox-info-updater.ts index 0945430..a383d88 100644 --- a/src/stacks-core-rpc/pox-info-updater.ts +++ b/src/stacks-core-rpc/pox-info-updater.ts @@ -1,7 +1,7 @@ import { logger } from '@hirosystems/api-toolkit'; import { PgStore } from '../pg/pg-store'; import { sleep } from '../helpers'; -import { ENV } from '../env'; +import { fetchRpcPoxInfo, getStacksNodeUrl } from './stacks-core-rpc-client'; // How long to wait between PoX rpc fetches when the database already has PoX info const POX_INFO_UPDATE_INTERVAL_MS = 30000; @@ -61,19 +61,3 @@ async function runPoxInfoBackgroundJob(db: PgStore, abortSignal: AbortSignal) { } } } - -interface PoxInfo { - first_burnchain_block_height: number; - reward_cycle_length: number; -} - -function getStacksNodeUrl(): string { - return `http://${ENV.STACKS_NODE_RPC_HOST}:${ENV.STACKS_NODE_RPC_PORT}`; -} - -async function fetchRpcPoxInfo(abortSignal: AbortSignal) { - const url = `${getStacksNodeUrl()}/v2/pox`; - const res = await fetch(url, { signal: abortSignal }); - const json = await res.json(); - return json as PoxInfo; -} diff --git a/src/stacks-core-rpc/stacker-set-updater.ts b/src/stacks-core-rpc/stacker-set-updater.ts new file mode 100644 index 0000000..5fba8e4 --- /dev/null +++ b/src/stacks-core-rpc/stacker-set-updater.ts @@ -0,0 +1,84 @@ +import { PgStore } from '../pg/pg-store'; +import PQueue from 'p-queue'; +import { fetchStackerSet } from './stacks-core-rpc-client'; +import { sleep } from '../helpers'; +import { logger } from '@hirosystems/api-toolkit'; +import { DbRewardSetSigner } from '../pg/types'; + +// TODO: make this configurable +// How long to wait between retries when fetching fails +const FETCH_STACKER_SET_RETRY_INTERVAL_MS = 3000; + +// TODO: make this configurable +const FETCH_STACKER_SET_CONCURRENCY_LIMIT = 2; + +export class StackerSetUpdator { + private readonly queue: PQueue; + private readonly db: PgStore; + private readonly abortController: AbortController; + private readonly queuedCycleNumbers = new Set(); + + constructor(args: { db: PgStore }) { + this.db = args.db; + this.abortController = new AbortController(); + this.queue = new PQueue({ + concurrency: FETCH_STACKER_SET_CONCURRENCY_LIMIT, + autoStart: true, + }); + } + + async stop() { + this.abortController.abort(); + await this.queue.onIdle(); + this.queue.pause(); + } + + add({ cycleNumber }: { cycleNumber: number }): void { + if (this.queuedCycleNumbers.has(cycleNumber) || this.abortController.signal.aborted) { + return; + } + this.queuedCycleNumbers.add(cycleNumber); + void this.queue + .add(() => this.fetchStackerSet(cycleNumber)) + .catch(error => { + if (!this.abortController.signal.aborted) { + // Should never reach here unless there's a bug in error handling + logger.error(error, `Unexpected stacker-set fetch queue error for cycle ${cycleNumber}`); + this.queuedCycleNumbers.delete(cycleNumber); + } + }); + } + + private async fetchStackerSet(cycleNumber: number) { + while (!this.abortController.signal.aborted) { + try { + const stackerSet = await fetchStackerSet(cycleNumber, this.abortController.signal); + const dbRewardSetSigners = stackerSet.stacker_set.signers.map(entry => { + const rewardSetSigner: DbRewardSetSigner = { + cycle_number: cycleNumber, + block_height: 0, + burn_block_height: 0, + signer_key: Buffer.from(entry.signing_key.replace(/^0x/, ''), 'hex'), + signer_weight: entry.weight, + signer_stacked_amount: entry.stacked_amt.toString(), + }; + return rewardSetSigner; + }); + await this.db.chainhook.sqlWriteTransaction(async sql => { + await this.db.chainhook.insertRewardSetSigners(sql, dbRewardSetSigners); + }); + this.queuedCycleNumbers.delete(cycleNumber); + return; // Exit loop after successful fetch and database update + } catch (error) { + if (this.abortController.signal.aborted) { + return; // Updater service was stopped, ignore error and exit loop + } + logger.warn( + error, + `Failed to fetch stacker set for cycle ${cycleNumber}, retrying in ${FETCH_STACKER_SET_RETRY_INTERVAL_MS}ms ...` + ); + await sleep(FETCH_STACKER_SET_RETRY_INTERVAL_MS, this.abortController.signal); + } + } + } +} diff --git a/src/stacks-core-rpc/stacks-core-rpc-client.ts b/src/stacks-core-rpc/stacks-core-rpc-client.ts new file mode 100644 index 0000000..15a77c7 --- /dev/null +++ b/src/stacks-core-rpc/stacks-core-rpc-client.ts @@ -0,0 +1,39 @@ +import { ENV } from '../env'; + +export function getStacksNodeUrl(): string { + return `http://${ENV.STACKS_NODE_RPC_HOST}:${ENV.STACKS_NODE_RPC_PORT}`; +} + +export interface PoxInfo { + first_burnchain_block_height: number; + reward_cycle_length: number; +} + +export async function fetchRpcPoxInfo(abortSignal: AbortSignal) { + const url = `${getStacksNodeUrl()}/v2/pox`; + const res = await fetch(url, { signal: abortSignal }); + const json = await res.json(); + return json as PoxInfo; +} + +export interface RpcStackerSetResponse { + stacker_set: { + rewarded_addresses: any[]; + start_cycle_state: { + missed_reward_slots: any[]; + }; + pox_ustx_threshold: number; + signers: { + signing_key: string; + stacked_amt: number; + weight: number; + }[]; + }; +} + +export async function fetchStackerSet(cycleNumber: number, abortSignal: AbortSignal) { + const url = `${getStacksNodeUrl()}/v3/stacker_set/${cycleNumber}`; + const res = await fetch(url, { signal: abortSignal }); + const json = await res.json(); + return json as RpcStackerSetResponse; +} From 376c9009db5c0cd421b1f355f3277c3883653efa Mon Sep 17 00:00:00 2001 From: Matthew Little Date: Mon, 28 Oct 2024 15:46:08 +0100 Subject: [PATCH 2/3] feat: detect and fetch missing pox cycle signer sets --- src/pg/chainhook/chainhook-pg-store.ts | 36 +++++++++++++++++-- src/stacks-core-rpc/stacker-set-updater.ts | 19 ++++++++-- src/stacks-core-rpc/stacks-core-rpc-client.ts | 14 ++++++-- 3 files changed, 62 insertions(+), 7 deletions(-) diff --git a/src/pg/chainhook/chainhook-pg-store.ts b/src/pg/chainhook/chainhook-pg-store.ts index 8ef0586..2ba5f90 100644 --- a/src/pg/chainhook/chainhook-pg-store.ts +++ b/src/pg/chainhook/chainhook-pg-store.ts @@ -18,6 +18,7 @@ import { DbRewardSetSigner, } from '../types'; import { normalizeHexString, unixTimeMillisecondsToISO, unixTimeSecondsToISO } from '../../helpers'; +import { EventEmitter } from 'node:events'; const RejectReasonValidationFailed = 'VALIDATION_FAILED'; @@ -52,6 +53,8 @@ type MockBlockData = Extract< >['data']; export class ChainhookPgStore extends BasePgStoreModule { + readonly events = new EventEmitter<{ missingStackerSet: [{ cycleNumber: number }] }>(); + async processPayload(payload: StacksPayload): Promise { await this.sqlWriteTransaction(async sql => { for (const block of payload.rollback) { @@ -418,9 +421,38 @@ export class ChainhookPgStore extends BasePgStoreModule { } private async insertBlock(sql: PgSqlClient, dbBlock: DbBlock) { - await sql` - INSERT INTO blocks ${sql(dbBlock)} + // After the block is inserted, calculate the reward_cycle_number, then check if the reward_set_signers + // table contains any rows for the calculated cycle_number. + const result = await sql<{ cycle_number: number | null; reward_set_exists: boolean }[]>` + WITH inserted AS ( + INSERT INTO blocks ${sql(dbBlock)} + RETURNING burn_block_height + ), + cycle_number AS ( + SELECT FLOOR((inserted.burn_block_height - ct.first_burnchain_block_height) / ct.reward_cycle_length) AS cycle_number + FROM inserted, chain_tip AS ct + LIMIT 1 + ) + SELECT + cn.cycle_number, + EXISTS ( + SELECT 1 + FROM reward_set_signers + WHERE cycle_number = cn.cycle_number + LIMIT 1 + ) AS reward_set_exists + FROM cycle_number AS cn `; + const { cycle_number, reward_set_exists } = result[0]; + if (cycle_number === null) { + logger.warn(`Failed to calculate cycle number for block ${dbBlock.block_height}`); + } else if (cycle_number !== null && !reward_set_exists) { + logger.warn( + `Missing reward set signers for cycle ${cycle_number} in block ${dbBlock.block_height}` + ); + // Use setImmediate to ensure we break out of the current sql transaction within the async context + setImmediate(() => this.events.emit('missingStackerSet', { cycleNumber: cycle_number })); + } logger.info(`ChainhookPgStore apply block ${dbBlock.block_height} ${dbBlock.block_hash}`); } diff --git a/src/stacks-core-rpc/stacker-set-updater.ts b/src/stacks-core-rpc/stacker-set-updater.ts index 5fba8e4..559a75d 100644 --- a/src/stacks-core-rpc/stacker-set-updater.ts +++ b/src/stacks-core-rpc/stacker-set-updater.ts @@ -1,6 +1,6 @@ import { PgStore } from '../pg/pg-store'; import PQueue from 'p-queue'; -import { fetchStackerSet } from './stacks-core-rpc-client'; +import { fetchStackerSet, getStacksNodeUrl } from './stacks-core-rpc-client'; import { sleep } from '../helpers'; import { logger } from '@hirosystems/api-toolkit'; import { DbRewardSetSigner } from '../pg/types'; @@ -25,6 +25,9 @@ export class StackerSetUpdator { concurrency: FETCH_STACKER_SET_CONCURRENCY_LIMIT, autoStart: true, }); + this.db.chainhook.events.on('missingStackerSet', ({ cycleNumber }) => { + this.add({ cycleNumber }); + }); } async stop() { @@ -52,8 +55,15 @@ export class StackerSetUpdator { private async fetchStackerSet(cycleNumber: number) { while (!this.abortController.signal.aborted) { try { + logger.info(`Fetching stacker set for cycle ${cycleNumber} from stacks-core RPC ...`); const stackerSet = await fetchStackerSet(cycleNumber, this.abortController.signal); - const dbRewardSetSigners = stackerSet.stacker_set.signers.map(entry => { + if (stackerSet.prePox4) { + logger.info(`Skipping stacker set update for cycle ${cycleNumber}, PoX-4 not yet active`); + this.queuedCycleNumbers.delete(cycleNumber); + return; // Exit loop after successful fetch + } + logger.info(`Fetched stacker set for cycle ${cycleNumber}, updating database ...`); + const dbRewardSetSigners = stackerSet.response.stacker_set.signers.map(entry => { const rewardSetSigner: DbRewardSetSigner = { cycle_number: cycleNumber, block_height: 0, @@ -67,8 +77,11 @@ export class StackerSetUpdator { await this.db.chainhook.sqlWriteTransaction(async sql => { await this.db.chainhook.insertRewardSetSigners(sql, dbRewardSetSigners); }); + logger.info( + `Updated database with stacker set for cycle ${cycleNumber}, ${dbRewardSetSigners.length} signers` + ); this.queuedCycleNumbers.delete(cycleNumber); - return; // Exit loop after successful fetch and database update + return; // Exit loop after successful database update } catch (error) { if (this.abortController.signal.aborted) { return; // Updater service was stopped, ignore error and exit loop diff --git a/src/stacks-core-rpc/stacks-core-rpc-client.ts b/src/stacks-core-rpc/stacks-core-rpc-client.ts index 15a77c7..ea03b28 100644 --- a/src/stacks-core-rpc/stacks-core-rpc-client.ts +++ b/src/stacks-core-rpc/stacks-core-rpc-client.ts @@ -31,9 +31,19 @@ export interface RpcStackerSetResponse { }; } -export async function fetchStackerSet(cycleNumber: number, abortSignal: AbortSignal) { +export async function fetchStackerSet( + cycleNumber: number, + abortSignal: AbortSignal +): Promise<{ prePox4: true } | { prePox4: false; response: RpcStackerSetResponse }> { const url = `${getStacksNodeUrl()}/v3/stacker_set/${cycleNumber}`; const res = await fetch(url, { signal: abortSignal }); const json = await res.json(); - return json as RpcStackerSetResponse; + if (!res.ok) { + const err = JSON.stringify(json); + if (/Pre-PoX-4/i.test(err)) { + return { prePox4: true }; + } + throw new Error(`Failed to fetch stacker set for cycle ${cycleNumber}: ${err}`); + } + return { prePox4: false, response: json as RpcStackerSetResponse }; } From ceab2deccd46c927075184733e72e214db34bee3 Mon Sep 17 00:00:00 2001 From: Matthew Little Date: Mon, 28 Oct 2024 16:18:46 +0100 Subject: [PATCH 3/3] fix: re-queue stacker-set fetch jobs on failure --- src/stacks-core-rpc/stacker-set-updater.ts | 76 +++++++++++----------- 1 file changed, 38 insertions(+), 38 deletions(-) diff --git a/src/stacks-core-rpc/stacker-set-updater.ts b/src/stacks-core-rpc/stacker-set-updater.ts index 559a75d..ed9ac0b 100644 --- a/src/stacks-core-rpc/stacker-set-updater.ts +++ b/src/stacks-core-rpc/stacker-set-updater.ts @@ -45,7 +45,6 @@ export class StackerSetUpdator { .add(() => this.fetchStackerSet(cycleNumber)) .catch(error => { if (!this.abortController.signal.aborted) { - // Should never reach here unless there's a bug in error handling logger.error(error, `Unexpected stacker-set fetch queue error for cycle ${cycleNumber}`); this.queuedCycleNumbers.delete(cycleNumber); } @@ -53,45 +52,46 @@ export class StackerSetUpdator { } private async fetchStackerSet(cycleNumber: number) { - while (!this.abortController.signal.aborted) { - try { - logger.info(`Fetching stacker set for cycle ${cycleNumber} from stacks-core RPC ...`); - const stackerSet = await fetchStackerSet(cycleNumber, this.abortController.signal); - if (stackerSet.prePox4) { - logger.info(`Skipping stacker set update for cycle ${cycleNumber}, PoX-4 not yet active`); - this.queuedCycleNumbers.delete(cycleNumber); - return; // Exit loop after successful fetch - } - logger.info(`Fetched stacker set for cycle ${cycleNumber}, updating database ...`); - const dbRewardSetSigners = stackerSet.response.stacker_set.signers.map(entry => { - const rewardSetSigner: DbRewardSetSigner = { - cycle_number: cycleNumber, - block_height: 0, - burn_block_height: 0, - signer_key: Buffer.from(entry.signing_key.replace(/^0x/, ''), 'hex'), - signer_weight: entry.weight, - signer_stacked_amount: entry.stacked_amt.toString(), - }; - return rewardSetSigner; - }); - await this.db.chainhook.sqlWriteTransaction(async sql => { - await this.db.chainhook.insertRewardSetSigners(sql, dbRewardSetSigners); - }); - logger.info( - `Updated database with stacker set for cycle ${cycleNumber}, ${dbRewardSetSigners.length} signers` - ); + try { + logger.info(`Fetching stacker set for cycle ${cycleNumber} from stacks-core RPC ...`); + const stackerSet = await fetchStackerSet(cycleNumber, this.abortController.signal); + if (stackerSet.prePox4) { + logger.info(`Skipping stacker set update for cycle ${cycleNumber}, PoX-4 not yet active`); this.queuedCycleNumbers.delete(cycleNumber); - return; // Exit loop after successful database update - } catch (error) { - if (this.abortController.signal.aborted) { - return; // Updater service was stopped, ignore error and exit loop - } - logger.warn( - error, - `Failed to fetch stacker set for cycle ${cycleNumber}, retrying in ${FETCH_STACKER_SET_RETRY_INTERVAL_MS}ms ...` - ); - await sleep(FETCH_STACKER_SET_RETRY_INTERVAL_MS, this.abortController.signal); + return; // Exit job successful fetch + } + logger.info(`Fetched stacker set for cycle ${cycleNumber}, updating database ...`); + const dbRewardSetSigners = stackerSet.response.stacker_set.signers.map(entry => { + const rewardSetSigner: DbRewardSetSigner = { + cycle_number: cycleNumber, + block_height: 0, + burn_block_height: 0, + signer_key: Buffer.from(entry.signing_key.replace(/^0x/, ''), 'hex'), + signer_weight: entry.weight, + signer_stacked_amount: entry.stacked_amt.toString(), + }; + return rewardSetSigner; + }); + await this.db.chainhook.sqlWriteTransaction(async sql => { + await this.db.chainhook.insertRewardSetSigners(sql, dbRewardSetSigners); + }); + logger.info( + `Updated database with stacker set for cycle ${cycleNumber}, ${dbRewardSetSigners.length} signers` + ); + this.queuedCycleNumbers.delete(cycleNumber); + } catch (error) { + if (this.abortController.signal.aborted) { + return; // Updater service was stopped, ignore error and exit loop } + logger.warn( + error, + `Failed to fetch stacker set for cycle ${cycleNumber}, retrying in ${FETCH_STACKER_SET_RETRY_INTERVAL_MS}ms ...` + ); + await sleep(FETCH_STACKER_SET_RETRY_INTERVAL_MS, this.abortController.signal); + setImmediate(() => { + this.queuedCycleNumbers.delete(cycleNumber); + this.add({ cycleNumber }); + }); } } }