Skip to content

Commit

Permalink
feat: detect and fetch missing pox cycle signer sets (#16)
Browse files Browse the repository at this point in the history
* feat: stacker-set updater background service

* feat: detect and fetch missing pox cycle signer sets

* fix: re-queue stacker-set fetch jobs on failure
  • Loading branch information
zone117x authored Oct 28, 2024
1 parent b98dbb6 commit 501cfaa
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 21 deletions.
4 changes: 4 additions & 0 deletions migrations/1729684505754_reward_set_signers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,8 @@ export function up(pgm: MigrationBuilder): void {
pgm.createIndex('reward_set_signers', ['signer_key']);
pgm.createIndex('reward_set_signers', ['block_height']);
pgm.createIndex('reward_set_signers', ['cycle_number']);

pgm.createConstraint('reward_set_signers', 'reward_set_signers_cycle_unique', {
unique: ['signer_key', 'cycle_number'],
});
}
10 changes: 10 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { isProdEnv } from './helpers';
import { buildProfilerServer, logger, registerShutdownConfig } from '@hirosystems/api-toolkit';
import { closeChainhookServer, startChainhookServer } from './chainhook/server';
import { startPoxInfoUpdater } from './stacks-core-rpc/pox-info-updater';
import { StackerSetUpdator } from './stacks-core-rpc/stacker-set-updater';

/**
* Initializes background services. Only for `default` and `writeonly` run modes.
Expand All @@ -22,6 +23,15 @@ async function initBackgroundServices(db: PgStore) {
},
});

const stackerSetUpdater = new StackerSetUpdator({ db });
registerShutdownConfig({
name: 'StackerSet fetcher',
forceKillable: false,
handler: async () => {
await stackerSetUpdater.stop();
},
});

const server = await startChainhookServer({ db });
registerShutdownConfig({
name: 'Chainhook Server',
Expand Down
46 changes: 42 additions & 4 deletions src/pg/chainhook/chainhook-pg-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
DbRewardSetSigner,
} from '../types';
import { normalizeHexString, unixTimeMillisecondsToISO, unixTimeSecondsToISO } from '../../helpers';
import { EventEmitter } from 'node:events';

const RejectReasonValidationFailed = 'VALIDATION_FAILED';

Expand Down Expand Up @@ -52,6 +53,8 @@ type MockBlockData = Extract<
>['data'];

export class ChainhookPgStore extends BasePgStoreModule {
readonly events = new EventEmitter<{ missingStackerSet: [{ cycleNumber: number }] }>();

async processPayload(payload: StacksPayload): Promise<void> {
await this.sqlWriteTransaction(async sql => {
for (const block of payload.rollback) {
Expand Down Expand Up @@ -418,9 +421,38 @@ export class ChainhookPgStore extends BasePgStoreModule {
}

private async insertBlock(sql: PgSqlClient, dbBlock: DbBlock) {
await sql`
INSERT INTO blocks ${sql(dbBlock)}
// After the block is inserted, calculate the reward_cycle_number, then check if the reward_set_signers
// table contains any rows for the calculated cycle_number.
const result = await sql<{ cycle_number: number | null; reward_set_exists: boolean }[]>`
WITH inserted AS (
INSERT INTO blocks ${sql(dbBlock)}
RETURNING burn_block_height
),
cycle_number AS (
SELECT FLOOR((inserted.burn_block_height - ct.first_burnchain_block_height) / ct.reward_cycle_length) AS cycle_number
FROM inserted, chain_tip AS ct
LIMIT 1
)
SELECT
cn.cycle_number,
EXISTS (
SELECT 1
FROM reward_set_signers
WHERE cycle_number = cn.cycle_number
LIMIT 1
) AS reward_set_exists
FROM cycle_number AS cn
`;
const { cycle_number, reward_set_exists } = result[0];
if (cycle_number === null) {
logger.warn(`Failed to calculate cycle number for block ${dbBlock.block_height}`);
} else if (cycle_number !== null && !reward_set_exists) {
logger.warn(
`Missing reward set signers for cycle ${cycle_number} in block ${dbBlock.block_height}`
);
// Use setImmediate to ensure we break out of the current sql transaction within the async context
setImmediate(() => this.events.emit('missingStackerSet', { cycleNumber: cycle_number }));
}
logger.info(`ChainhookPgStore apply block ${dbBlock.block_height} ${dbBlock.block_hash}`);
}

Expand All @@ -435,11 +467,17 @@ export class ChainhookPgStore extends BasePgStoreModule {
}
}

private async insertRewardSetSigners(sql: PgSqlClient, rewardSetSigners: DbRewardSetSigner[]) {
async insertRewardSetSigners(sql: PgSqlClient, rewardSetSigners: DbRewardSetSigner[]) {
for await (const batch of batchIterate(rewardSetSigners, 500)) {
await sql`
const result = await sql`
INSERT INTO reward_set_signers ${sql(batch)}
ON CONFLICT ON CONSTRAINT reward_set_signers_cycle_unique DO NOTHING
`;
if (result.count === 0) {
logger.warn(
`Skipped inserting duplicate reward set signers for cycle ${rewardSetSigners[0].cycle_number}`
);
}
}
}

Expand Down
18 changes: 1 addition & 17 deletions src/stacks-core-rpc/pox-info-updater.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { logger } from '@hirosystems/api-toolkit';
import { PgStore } from '../pg/pg-store';
import { sleep } from '../helpers';
import { ENV } from '../env';
import { fetchRpcPoxInfo, getStacksNodeUrl } from './stacks-core-rpc-client';

// How long to wait between PoX rpc fetches when the database already has PoX info
const POX_INFO_UPDATE_INTERVAL_MS = 30000;
Expand Down Expand Up @@ -61,19 +61,3 @@ async function runPoxInfoBackgroundJob(db: PgStore, abortSignal: AbortSignal) {
}
}
}

interface PoxInfo {
first_burnchain_block_height: number;
reward_cycle_length: number;
}

function getStacksNodeUrl(): string {
return `http://${ENV.STACKS_NODE_RPC_HOST}:${ENV.STACKS_NODE_RPC_PORT}`;
}

async function fetchRpcPoxInfo(abortSignal: AbortSignal) {
const url = `${getStacksNodeUrl()}/v2/pox`;
const res = await fetch(url, { signal: abortSignal });
const json = await res.json();
return json as PoxInfo;
}
97 changes: 97 additions & 0 deletions src/stacks-core-rpc/stacker-set-updater.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import { PgStore } from '../pg/pg-store';
import PQueue from 'p-queue';
import { fetchStackerSet, getStacksNodeUrl } from './stacks-core-rpc-client';
import { sleep } from '../helpers';
import { logger } from '@hirosystems/api-toolkit';
import { DbRewardSetSigner } from '../pg/types';

// TODO: make this configurable
// How long to wait between retries when fetching fails
const FETCH_STACKER_SET_RETRY_INTERVAL_MS = 3000;

// TODO: make this configurable
const FETCH_STACKER_SET_CONCURRENCY_LIMIT = 2;

export class StackerSetUpdator {
private readonly queue: PQueue;
private readonly db: PgStore;
private readonly abortController: AbortController;
private readonly queuedCycleNumbers = new Set<number>();

constructor(args: { db: PgStore }) {
this.db = args.db;
this.abortController = new AbortController();
this.queue = new PQueue({
concurrency: FETCH_STACKER_SET_CONCURRENCY_LIMIT,
autoStart: true,
});
this.db.chainhook.events.on('missingStackerSet', ({ cycleNumber }) => {
this.add({ cycleNumber });
});
}

async stop() {
this.abortController.abort();
await this.queue.onIdle();
this.queue.pause();
}

add({ cycleNumber }: { cycleNumber: number }): void {
if (this.queuedCycleNumbers.has(cycleNumber) || this.abortController.signal.aborted) {
return;
}
this.queuedCycleNumbers.add(cycleNumber);
void this.queue
.add(() => this.fetchStackerSet(cycleNumber))
.catch(error => {
if (!this.abortController.signal.aborted) {
logger.error(error, `Unexpected stacker-set fetch queue error for cycle ${cycleNumber}`);
this.queuedCycleNumbers.delete(cycleNumber);
}
});
}

private async fetchStackerSet(cycleNumber: number) {
try {
logger.info(`Fetching stacker set for cycle ${cycleNumber} from stacks-core RPC ...`);
const stackerSet = await fetchStackerSet(cycleNumber, this.abortController.signal);
if (stackerSet.prePox4) {
logger.info(`Skipping stacker set update for cycle ${cycleNumber}, PoX-4 not yet active`);
this.queuedCycleNumbers.delete(cycleNumber);
return; // Exit job successful fetch
}
logger.info(`Fetched stacker set for cycle ${cycleNumber}, updating database ...`);
const dbRewardSetSigners = stackerSet.response.stacker_set.signers.map(entry => {
const rewardSetSigner: DbRewardSetSigner = {
cycle_number: cycleNumber,
block_height: 0,
burn_block_height: 0,
signer_key: Buffer.from(entry.signing_key.replace(/^0x/, ''), 'hex'),
signer_weight: entry.weight,
signer_stacked_amount: entry.stacked_amt.toString(),
};
return rewardSetSigner;
});
await this.db.chainhook.sqlWriteTransaction(async sql => {
await this.db.chainhook.insertRewardSetSigners(sql, dbRewardSetSigners);
});
logger.info(
`Updated database with stacker set for cycle ${cycleNumber}, ${dbRewardSetSigners.length} signers`
);
this.queuedCycleNumbers.delete(cycleNumber);
} catch (error) {
if (this.abortController.signal.aborted) {
return; // Updater service was stopped, ignore error and exit loop
}
logger.warn(
error,
`Failed to fetch stacker set for cycle ${cycleNumber}, retrying in ${FETCH_STACKER_SET_RETRY_INTERVAL_MS}ms ...`
);
await sleep(FETCH_STACKER_SET_RETRY_INTERVAL_MS, this.abortController.signal);
setImmediate(() => {
this.queuedCycleNumbers.delete(cycleNumber);
this.add({ cycleNumber });
});
}
}
}
49 changes: 49 additions & 0 deletions src/stacks-core-rpc/stacks-core-rpc-client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import { ENV } from '../env';

export function getStacksNodeUrl(): string {
return `http://${ENV.STACKS_NODE_RPC_HOST}:${ENV.STACKS_NODE_RPC_PORT}`;
}

export interface PoxInfo {
first_burnchain_block_height: number;
reward_cycle_length: number;
}

export async function fetchRpcPoxInfo(abortSignal: AbortSignal) {
const url = `${getStacksNodeUrl()}/v2/pox`;
const res = await fetch(url, { signal: abortSignal });
const json = await res.json();
return json as PoxInfo;
}

export interface RpcStackerSetResponse {
stacker_set: {
rewarded_addresses: any[];
start_cycle_state: {
missed_reward_slots: any[];
};
pox_ustx_threshold: number;
signers: {
signing_key: string;
stacked_amt: number;
weight: number;
}[];
};
}

export async function fetchStackerSet(
cycleNumber: number,
abortSignal: AbortSignal
): Promise<{ prePox4: true } | { prePox4: false; response: RpcStackerSetResponse }> {
const url = `${getStacksNodeUrl()}/v3/stacker_set/${cycleNumber}`;
const res = await fetch(url, { signal: abortSignal });
const json = await res.json();
if (!res.ok) {
const err = JSON.stringify(json);
if (/Pre-PoX-4/i.test(err)) {
return { prePox4: true };
}
throw new Error(`Failed to fetch stacker set for cycle ${cycleNumber}: ${err}`);
}
return { prePox4: false, response: json as RpcStackerSetResponse };
}

0 comments on commit 501cfaa

Please sign in to comment.