Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: detect and fetch missing pox cycle signer sets #16

Merged
merged 3 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions migrations/1729684505754_reward_set_signers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,8 @@ export function up(pgm: MigrationBuilder): void {
pgm.createIndex('reward_set_signers', ['signer_key']);
pgm.createIndex('reward_set_signers', ['block_height']);
pgm.createIndex('reward_set_signers', ['cycle_number']);

pgm.createConstraint('reward_set_signers', 'reward_set_signers_cycle_unique', {
unique: ['signer_key', 'cycle_number'],
});
}
10 changes: 10 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { isProdEnv } from './helpers';
import { buildProfilerServer, logger, registerShutdownConfig } from '@hirosystems/api-toolkit';
import { closeChainhookServer, startChainhookServer } from './chainhook/server';
import { startPoxInfoUpdater } from './stacks-core-rpc/pox-info-updater';
import { StackerSetUpdator } from './stacks-core-rpc/stacker-set-updater';

/**
* Initializes background services. Only for `default` and `writeonly` run modes.
Expand All @@ -22,6 +23,15 @@ async function initBackgroundServices(db: PgStore) {
},
});

const stackerSetUpdater = new StackerSetUpdator({ db });
registerShutdownConfig({
name: 'StackerSet fetcher',
forceKillable: false,
handler: async () => {
await stackerSetUpdater.stop();
},
});

const server = await startChainhookServer({ db });
registerShutdownConfig({
name: 'Chainhook Server',
Expand Down
46 changes: 42 additions & 4 deletions src/pg/chainhook/chainhook-pg-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
DbRewardSetSigner,
} from '../types';
import { normalizeHexString, unixTimeMillisecondsToISO, unixTimeSecondsToISO } from '../../helpers';
import { EventEmitter } from 'node:events';

const RejectReasonValidationFailed = 'VALIDATION_FAILED';

Expand Down Expand Up @@ -52,6 +53,8 @@ type MockBlockData = Extract<
>['data'];

export class ChainhookPgStore extends BasePgStoreModule {
readonly events = new EventEmitter<{ missingStackerSet: [{ cycleNumber: number }] }>();

async processPayload(payload: StacksPayload): Promise<void> {
await this.sqlWriteTransaction(async sql => {
for (const block of payload.rollback) {
Expand Down Expand Up @@ -418,9 +421,38 @@ export class ChainhookPgStore extends BasePgStoreModule {
}

private async insertBlock(sql: PgSqlClient, dbBlock: DbBlock) {
await sql`
INSERT INTO blocks ${sql(dbBlock)}
// After the block is inserted, calculate the reward_cycle_number, then check if the reward_set_signers
// table contains any rows for the calculated cycle_number.
const result = await sql<{ cycle_number: number | null; reward_set_exists: boolean }[]>`
WITH inserted AS (
INSERT INTO blocks ${sql(dbBlock)}
RETURNING burn_block_height
),
cycle_number AS (
SELECT FLOOR((inserted.burn_block_height - ct.first_burnchain_block_height) / ct.reward_cycle_length) AS cycle_number
FROM inserted, chain_tip AS ct
LIMIT 1
)
SELECT
cn.cycle_number,
EXISTS (
SELECT 1
FROM reward_set_signers
WHERE cycle_number = cn.cycle_number
LIMIT 1
) AS reward_set_exists
FROM cycle_number AS cn
`;
const { cycle_number, reward_set_exists } = result[0];
if (cycle_number === null) {
logger.warn(`Failed to calculate cycle number for block ${dbBlock.block_height}`);
} else if (cycle_number !== null && !reward_set_exists) {
logger.warn(
`Missing reward set signers for cycle ${cycle_number} in block ${dbBlock.block_height}`
);
// Use setImmediate to ensure we break out of the current sql transaction within the async context
setImmediate(() => this.events.emit('missingStackerSet', { cycleNumber: cycle_number }));
}
logger.info(`ChainhookPgStore apply block ${dbBlock.block_height} ${dbBlock.block_hash}`);
}

Expand All @@ -435,11 +467,17 @@ export class ChainhookPgStore extends BasePgStoreModule {
}
}

private async insertRewardSetSigners(sql: PgSqlClient, rewardSetSigners: DbRewardSetSigner[]) {
async insertRewardSetSigners(sql: PgSqlClient, rewardSetSigners: DbRewardSetSigner[]) {
for await (const batch of batchIterate(rewardSetSigners, 500)) {
await sql`
const result = await sql`
INSERT INTO reward_set_signers ${sql(batch)}
ON CONFLICT ON CONSTRAINT reward_set_signers_cycle_unique DO NOTHING
`;
if (result.count === 0) {
logger.warn(
`Skipped inserting duplicate reward set signers for cycle ${rewardSetSigners[0].cycle_number}`
);
}
}
}

Expand Down
18 changes: 1 addition & 17 deletions src/stacks-core-rpc/pox-info-updater.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { logger } from '@hirosystems/api-toolkit';
import { PgStore } from '../pg/pg-store';
import { sleep } from '../helpers';
import { ENV } from '../env';
import { fetchRpcPoxInfo, getStacksNodeUrl } from './stacks-core-rpc-client';

// How long to wait between PoX rpc fetches when the database already has PoX info
const POX_INFO_UPDATE_INTERVAL_MS = 30000;
Expand Down Expand Up @@ -61,19 +61,3 @@ async function runPoxInfoBackgroundJob(db: PgStore, abortSignal: AbortSignal) {
}
}
}

interface PoxInfo {
first_burnchain_block_height: number;
reward_cycle_length: number;
}

function getStacksNodeUrl(): string {
return `http://${ENV.STACKS_NODE_RPC_HOST}:${ENV.STACKS_NODE_RPC_PORT}`;
}

async function fetchRpcPoxInfo(abortSignal: AbortSignal) {
const url = `${getStacksNodeUrl()}/v2/pox`;
const res = await fetch(url, { signal: abortSignal });
const json = await res.json();
return json as PoxInfo;
}
97 changes: 97 additions & 0 deletions src/stacks-core-rpc/stacker-set-updater.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import { PgStore } from '../pg/pg-store';
import PQueue from 'p-queue';
import { fetchStackerSet, getStacksNodeUrl } from './stacks-core-rpc-client';
import { sleep } from '../helpers';
import { logger } from '@hirosystems/api-toolkit';
import { DbRewardSetSigner } from '../pg/types';

// TODO: make this configurable
// How long to wait between retries when fetching fails
const FETCH_STACKER_SET_RETRY_INTERVAL_MS = 3000;

// TODO: make this configurable
const FETCH_STACKER_SET_CONCURRENCY_LIMIT = 2;

export class StackerSetUpdator {
private readonly queue: PQueue;
private readonly db: PgStore;
private readonly abortController: AbortController;
private readonly queuedCycleNumbers = new Set<number>();

constructor(args: { db: PgStore }) {
this.db = args.db;
this.abortController = new AbortController();
this.queue = new PQueue({
concurrency: FETCH_STACKER_SET_CONCURRENCY_LIMIT,
autoStart: true,
});
this.db.chainhook.events.on('missingStackerSet', ({ cycleNumber }) => {
this.add({ cycleNumber });
});
}

async stop() {
this.abortController.abort();
await this.queue.onIdle();
this.queue.pause();
}

add({ cycleNumber }: { cycleNumber: number }): void {
if (this.queuedCycleNumbers.has(cycleNumber) || this.abortController.signal.aborted) {
return;
}
this.queuedCycleNumbers.add(cycleNumber);
void this.queue
.add(() => this.fetchStackerSet(cycleNumber))
.catch(error => {
if (!this.abortController.signal.aborted) {
// Should never reach here unless there's a bug in error handling
logger.error(error, `Unexpected stacker-set fetch queue error for cycle ${cycleNumber}`);
this.queuedCycleNumbers.delete(cycleNumber);
}
});
}

private async fetchStackerSet(cycleNumber: number) {
while (!this.abortController.signal.aborted) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of an infinite loop that would keep a job alive indefinitely, could we use logic to re-add this cycle fetch to the back of the queue if it fails and let the PQueue handle the retry? With this infinite loop, if you have a cycle that gets stuck and you add more cycles to the queue there could be a situation where you stop processing new cycles altogether if you hit your CONCURRENCY_LIMIT

Perhaps not super urgent for now, though, could be done in another PR but I think it's something we should keep in mind

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, fixed!

try {
logger.info(`Fetching stacker set for cycle ${cycleNumber} from stacks-core RPC ...`);
const stackerSet = await fetchStackerSet(cycleNumber, this.abortController.signal);
if (stackerSet.prePox4) {
logger.info(`Skipping stacker set update for cycle ${cycleNumber}, PoX-4 not yet active`);
this.queuedCycleNumbers.delete(cycleNumber);
return; // Exit loop after successful fetch
}
logger.info(`Fetched stacker set for cycle ${cycleNumber}, updating database ...`);
const dbRewardSetSigners = stackerSet.response.stacker_set.signers.map(entry => {
const rewardSetSigner: DbRewardSetSigner = {
cycle_number: cycleNumber,
block_height: 0,
burn_block_height: 0,
signer_key: Buffer.from(entry.signing_key.replace(/^0x/, ''), 'hex'),
signer_weight: entry.weight,
signer_stacked_amount: entry.stacked_amt.toString(),
};
return rewardSetSigner;
});
await this.db.chainhook.sqlWriteTransaction(async sql => {
await this.db.chainhook.insertRewardSetSigners(sql, dbRewardSetSigners);
});
logger.info(
`Updated database with stacker set for cycle ${cycleNumber}, ${dbRewardSetSigners.length} signers`
);
this.queuedCycleNumbers.delete(cycleNumber);
return; // Exit loop after successful database update
} catch (error) {
if (this.abortController.signal.aborted) {
return; // Updater service was stopped, ignore error and exit loop
}
logger.warn(
error,
`Failed to fetch stacker set for cycle ${cycleNumber}, retrying in ${FETCH_STACKER_SET_RETRY_INTERVAL_MS}ms ...`
);
await sleep(FETCH_STACKER_SET_RETRY_INTERVAL_MS, this.abortController.signal);
}
}
}
}
49 changes: 49 additions & 0 deletions src/stacks-core-rpc/stacks-core-rpc-client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import { ENV } from '../env';

export function getStacksNodeUrl(): string {
return `http://${ENV.STACKS_NODE_RPC_HOST}:${ENV.STACKS_NODE_RPC_PORT}`;
}

export interface PoxInfo {
first_burnchain_block_height: number;
reward_cycle_length: number;
}

export async function fetchRpcPoxInfo(abortSignal: AbortSignal) {
const url = `${getStacksNodeUrl()}/v2/pox`;
const res = await fetch(url, { signal: abortSignal });
const json = await res.json();
return json as PoxInfo;
}

export interface RpcStackerSetResponse {
stacker_set: {
rewarded_addresses: any[];
start_cycle_state: {
missed_reward_slots: any[];
};
pox_ustx_threshold: number;
signers: {
signing_key: string;
stacked_amt: number;
weight: number;
}[];
};
}

export async function fetchStackerSet(
cycleNumber: number,
abortSignal: AbortSignal
): Promise<{ prePox4: true } | { prePox4: false; response: RpcStackerSetResponse }> {
const url = `${getStacksNodeUrl()}/v3/stacker_set/${cycleNumber}`;
const res = await fetch(url, { signal: abortSignal });
const json = await res.json();
if (!res.ok) {
const err = JSON.stringify(json);
if (/Pre-PoX-4/i.test(err)) {
return { prePox4: true };
}
throw new Error(`Failed to fetch stacker set for cycle ${cycleNumber}: ${err}`);
}
return { prePox4: false, response: json as RpcStackerSetResponse };
}
Loading