Skip to content

Commit

Permalink
chore: more logging in pg ingestion code (#36)
Browse files Browse the repository at this point in the history
  • Loading branch information
zone117x authored Nov 5, 2024
1 parent bb97910 commit 40a720c
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 60 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"build": "rimraf ./dist && tsc --project tsconfig.build.json",
"start": "node ./dist/src/index.js",
"start-ts": "ts-node ./src/index.ts",
"test": "jest --runInBand",
"test": "jest",
"test:unit": "jest --selectProjects unit-tests",
"test:db": "jest --selectProjects db-tests",
"test:api": "npm run test -- ./tests/api/",
Expand Down
155 changes: 107 additions & 48 deletions src/pg/chainhook/chainhook-pg-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import {
BasePgStoreModule,
PgSqlClient,
batchIterate,
logger,
logger as defaultLogger,
stopwatch,
} from '@hirosystems/api-toolkit';
import { StacksEvent, StacksPayload } from '@hirosystems/chainhook-client';
Expand Down Expand Up @@ -55,6 +55,7 @@ type MockBlockData = Extract<

export class ChainhookPgStore extends BasePgStoreModule {
readonly events = new EventEmitter<{ missingStackerSet: [{ cycleNumber: number }] }>();
readonly logger = defaultLogger.child({ module: 'ChainhookPgStore' });

constructor(db: BasePgStore) {
super(db);
Expand All @@ -63,10 +64,10 @@ export class ChainhookPgStore extends BasePgStoreModule {
async processPayload(payload: StacksPayload): Promise<void> {
await this.sqlWriteTransaction(async sql => {
for (const block of payload.rollback) {
logger.info(`ChainhookPgStore rollback block ${block.block_identifier.index}`);
this.logger.info(`ChainhookPgStore rollback block ${block.block_identifier.index}`);
const time = stopwatch();
await this.updateStacksBlock(sql, block, 'rollback');
logger.info(
this.logger.info(
`ChainhookPgStore rollback block ${
block.block_identifier.index
} finished in ${time.getElapsedSeconds()}s`
Expand All @@ -78,16 +79,16 @@ export class ChainhookPgStore extends BasePgStoreModule {
}
for (const block of payload.apply) {
if (block.block_identifier.index <= (await this.getLastIngestedBlockHeight())) {
logger.info(
this.logger.info(
`ChainhookPgStore skipping previously ingested block ${block.block_identifier.index}`
);
continue;
}
logger.info(`ChainhookPgStore apply block ${block.block_identifier.index}`);
this.logger.info(`ChainhookPgStore apply block ${block.block_identifier.index}`);
const time = stopwatch();
await this.updateStacksBlock(sql, block, 'apply');
await this.updateChainTipBlockHeight(block.block_identifier.index);
logger.info(
this.logger.info(
`ChainhookPgStore apply block ${
block.block_identifier.index
} finished in ${time.getElapsedSeconds()}s`
Expand All @@ -98,7 +99,7 @@ export class ChainhookPgStore extends BasePgStoreModule {
if (event.payload.type === 'SignerMessage') {
await this.applySignerMessageEvent(sql, event);
} else {
logger.error(`Unknown chainhook payload event type: ${event.payload.type}`);
this.logger.error(`Unknown chainhook payload event type: ${event.payload.type}`);
}
}
});
Expand Down Expand Up @@ -134,7 +135,7 @@ export class ChainhookPgStore extends BasePgStoreModule {
break;
}
case 'BlockPushed': {
logger.info(`Ignoring BlockPushed StackerDB event`);
this.logger.info(`Ignoring BlockPushed StackerDB event`);
break;
}
case 'MockProposal': {
Expand Down Expand Up @@ -166,7 +167,7 @@ export class ChainhookPgStore extends BasePgStoreModule {
break;
}
default: {
logger.error(event.payload.data, `Unknown StackerDB event type`);
this.logger.error(event.payload.data, `Unknown StackerDB event type`);
break;
}
}
Expand Down Expand Up @@ -196,32 +197,47 @@ export class ChainhookPgStore extends BasePgStoreModule {
network_id: messageData.mock_proposal.peer_info.network_id,
index_block_hash: normalizeHexString(messageData.mock_proposal.peer_info.index_block_hash),
};
const result = await sql`
const mockBlockInsertResult = await sql`
INSERT INTO mock_blocks ${sql(dbMockBlock)}
ON CONFLICT ON CONSTRAINT mock_blocks_idb_unique DO NOTHING
`;

if (result.count === 0) {
logger.info(
if (mockBlockInsertResult.count === 0) {
this.logger.info(
`Skipped inserting duplicate mock block height=${dbMockBlock.stacks_tip_height}, hash=${dbMockBlock.stacks_tip}`
);
return;
}
} else {
this.logger.info(
`ChainhookPgStore apply mock_block height=${dbMockBlock.stacks_tip_height}, hash=${dbMockBlock.stacks_tip}`
);

for (const batch of batchIterate(messageData.mock_signatures, 500)) {
const sigs = batch.map(sig => {
const dbSig: DbMockBlockSignerSignature = {
signer_key: normalizeHexString(sig.pubkey),
signer_signature: normalizeHexString(sig.signature),
stacks_tip: sig.mock_proposal.peer_info.stacks_tip,
stacks_tip_height: sig.mock_proposal.peer_info.stacks_tip_height,
index_block_hash: sig.mock_proposal.peer_info.index_block_hash,
};
return dbSig;
});
await sql`
INSERT INTO mock_block_signer_signatures ${sql(sigs)}
`;
let mockBlockSigInsertCount = 0;
for (const batch of batchIterate(messageData.mock_signatures, 500)) {
const sigs = batch.map(sig => {
const dbSig: DbMockBlockSignerSignature = {
signer_key: normalizeHexString(sig.pubkey),
signer_signature: normalizeHexString(sig.signature),
stacks_tip: sig.mock_proposal.peer_info.stacks_tip,
stacks_tip_height: sig.mock_proposal.peer_info.stacks_tip_height,
index_block_hash: sig.mock_proposal.peer_info.index_block_hash,
};
return dbSig;
});
// TODO: add unique constraint here
const sigInsertResult = await sql`
INSERT INTO mock_block_signer_signatures ${sql(sigs)}
`;
mockBlockSigInsertCount += sigInsertResult.count;
}
if (mockBlockSigInsertCount === 0) {
this.logger.info(
`Skipped inserting duplicate mock block signer signatures for block ${dbMockBlock.stacks_tip_height}`
);
} else {
this.logger.info(
`ChainhookPgStore apply mock_block_signer_signatures, block=${dbMockBlock.stacks_tip_height}, count=${mockBlockSigInsertCount}`
);
}
}
}

Expand Down Expand Up @@ -256,9 +272,13 @@ export class ChainhookPgStore extends BasePgStoreModule {
ON CONFLICT ON CONSTRAINT mock_signatures_signer_key_idb_unique DO NOTHING
`;
if (result.count === 0) {
logger.info(
this.logger.info(
`Skipped inserting duplicate mock signature height=${dbMockSignature.stacks_tip_height}, hash=${dbMockSignature.stacks_tip}, signer=${dbMockSignature.signer_key}`
);
} else {
this.logger.info(
`ChainhookPgStore apply mock_signature height=${dbMockSignature.stacks_tip_height}, hash=${dbMockSignature.stacks_tip}, signer=${dbMockSignature.signer_key}`
);
}
}

Expand All @@ -285,9 +305,13 @@ export class ChainhookPgStore extends BasePgStoreModule {
ON CONFLICT ON CONSTRAINT mock_proposals_idb_unique DO NOTHING
`;
if (result.count === 0) {
logger.info(
this.logger.info(
`Skipped inserting duplicate mock proposal height=${dbMockProposal.stacks_tip_height}, hash=${dbMockProposal.stacks_tip}`
);
} else {
this.logger.info(
`ChainhookPgStore apply mock_proposal height=${dbMockProposal.stacks_tip_height}, hash=${dbMockProposal.stacks_tip}`
);
}
}

Expand All @@ -312,9 +336,13 @@ export class ChainhookPgStore extends BasePgStoreModule {
ON CONFLICT ON CONSTRAINT block_proposals_block_hash_unique DO NOTHING
`;
if (result.count === 0) {
logger.info(
this.logger.info(
`Skipped inserting duplicate block proposal height=${dbBlockProposal.block_height}, hash=${dbBlockProposal.block_hash}`
);
} else {
this.logger.info(
`ChainhookPgStore apply block_proposal height=${dbBlockProposal.block_height}, hash=${dbBlockProposal.block_hash}`
);
}
}

Expand All @@ -325,7 +353,7 @@ export class ChainhookPgStore extends BasePgStoreModule {
messageData: BlockResponseData
) {
if (messageData.type !== 'Accepted' && messageData.type !== 'Rejected') {
logger.error(messageData, `Unexpected BlockResponse type`);
this.logger.error(messageData, `Unexpected BlockResponse type`);
}
const accepted = messageData.type === 'Accepted';

Expand Down Expand Up @@ -359,9 +387,13 @@ export class ChainhookPgStore extends BasePgStoreModule {
`;

if (result.count === 0) {
logger.info(
this.logger.info(
`Skipped inserting duplicate block response signer=${dbBlockResponse.signer_key}, hash=${dbBlockResponse.signer_sighash}`
);
} else {
this.logger.info(
`ChainhookPgStore apply block_response signer=${dbBlockResponse.signer_key}, hash=${dbBlockResponse.signer_sighash}`
);
}
}

Expand Down Expand Up @@ -401,7 +433,7 @@ export class ChainhookPgStore extends BasePgStoreModule {
};
return dbSig;
});
if (dbSignerSignatures && dbSignerSignatures.length > 0) {
if (dbSignerSignatures) {
await this.insertBlockSignerSignatures(sql, dbSignerSignatures);
}

Expand All @@ -428,12 +460,13 @@ export class ChainhookPgStore extends BasePgStoreModule {
private async insertBlock(sql: PgSqlClient, dbBlock: DbBlock) {
// Skip pre-nakamoto blocks
if (!dbBlock.is_nakamoto_block) {
logger.info(
this.logger.info(
`ChainhookPgStore skipping apply for pre-nakamoto block ${dbBlock.block_height} ${dbBlock.block_hash}`
);
} else {
// After the block is inserted, calculate the reward_cycle_number, then check if the reward_set_signers
// table contains any rows for the calculated cycle_number.
// TODO: add unique constraint here
const result = await sql<{ cycle_number: number | null; reward_set_exists: boolean }[]>`
WITH inserted AS (
INSERT INTO blocks ${sql(dbBlock)}
Expand All @@ -456,40 +489,64 @@ export class ChainhookPgStore extends BasePgStoreModule {
`;
const { cycle_number, reward_set_exists } = result[0];
if (cycle_number === null) {
logger.warn(`Failed to calculate cycle number for block ${dbBlock.block_height}`);
this.logger.warn(`Failed to calculate cycle number for block ${dbBlock.block_height}`);
} else if (cycle_number !== null && !reward_set_exists) {
logger.warn(
this.logger.warn(
`Missing reward set signers for cycle ${cycle_number} in block ${dbBlock.block_height}`
);
// Use setImmediate to ensure we break out of the current sql transaction within the async context
setImmediate(() => this.events.emit('missingStackerSet', { cycleNumber: cycle_number }));
}
logger.info(`ChainhookPgStore apply block ${dbBlock.block_height} ${dbBlock.block_hash}`);
this.logger.info(
`ChainhookPgStore apply block ${dbBlock.block_height} ${dbBlock.block_hash}`
);
}
}

private async insertBlockSignerSignatures(
sql: PgSqlClient,
signerSigs: DbBlockSignerSignature[]
) {
if (signerSigs.length === 0) {
// nothing to insert
return;
}
let insertCount = 0;
for await (const batch of batchIterate(signerSigs, 500)) {
await sql`
// TODO: add unique constraint here
const result = await sql`
INSERT INTO block_signer_signatures ${sql(batch)}
`;
insertCount += result.count;
}
if (insertCount === 0) {
this.logger.info(
`Skipped inserting duplicate block signer signatures for block ${signerSigs[0].block_height}`
);
} else {
this.logger.info(
`ChainhookPgStore apply block_signer_signatures, block=${signerSigs[0].block_height}, count=${insertCount}`
);
}
}

async insertRewardSetSigners(sql: PgSqlClient, rewardSetSigners: DbRewardSetSigner[]) {
let insertCount = 0;
for await (const batch of batchIterate(rewardSetSigners, 500)) {
const result = await sql`
INSERT INTO reward_set_signers ${sql(batch)}
ON CONFLICT ON CONSTRAINT reward_set_signers_cycle_unique DO NOTHING
`;
if (result.count === 0) {
logger.warn(
`Skipped inserting duplicate reward set signers for cycle ${rewardSetSigners[0].cycle_number}`
);
}
insertCount += result.count;
}
if (insertCount === 0) {
this.logger.info(
`Skipped inserting duplicate reward set signers for cycle ${rewardSetSigners[0].cycle_number}`
);
} else {
this.logger.info(
`ChainhookPgStore apply reward_set_signers, cycle=${rewardSetSigners[0].cycle_number}, count=${insertCount}`
);
}
}

Expand All @@ -504,17 +561,19 @@ export class ChainhookPgStore extends BasePgStoreModule {
const res = await sql`
DELETE FROM blocks WHERE block_height = ${blockHeight}
`;
logger.info(`ChainhookPgStore rollback block ${blockHeight}`);
this.logger.info(`ChainhookPgStore rollback block ${blockHeight}`);
if (res.count !== 1) {
logger.warn(`Unexpected number of rows deleted for block ${blockHeight}, ${res.count} rows`);
this.logger.warn(
`Unexpected number of rows deleted for block ${blockHeight}, ${res.count} rows`
);
}
}

private async rollBackBlockSignerSignatures(sql: PgSqlClient, blockHeight: number) {
const res = await sql`
DELETE FROM block_signer_signatures WHERE block_height = ${blockHeight}
`;
logger.info(
this.logger.info(
`ChainhookPgStore rollback block signer signatures for block ${blockHeight}, deleted ${res.count} rows`
);
}
Expand All @@ -523,7 +582,7 @@ export class ChainhookPgStore extends BasePgStoreModule {
const res = await sql`
DELETE FROM reward_set_signers WHERE block_height = ${blockHeight}
`;
logger.info(
this.logger.info(
`ChainhookPgStore rollback reward set signers for block ${blockHeight}, deleted ${res.count} rows`
);
}
Expand Down
24 changes: 13 additions & 11 deletions tests/db/endpoints.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,6 @@ describe('Postgres ingestion tests', () => {
apiServer = await buildApiServer({ db });
await apiServer.listen({ port: 0, host: '127.0.0.1' });

// insert chainhook-payloads dump
const payloadDumpFile = './tests/dumps/dump-chainhook-payloads-2024-11-02.ndjson.gz';
const rl = readline.createInterface({
input: fs.createReadStream(payloadDumpFile).pipe(zlib.createGunzip()),
crlfDelay: Infinity,
});
for await (const line of rl) {
await db.chainhook.processPayload(JSON.parse(line) as StacksPayload);
}
rl.close();

// insert pox-info dump
const poxInfoDump = JSON.parse(
fs.readFileSync('./tests/dumps/dump-pox-info-2024-11-02.json', 'utf8')
Expand All @@ -51,6 +40,19 @@ describe('Postgres ingestion tests', () => {
db.sql,
rpcStackerSetToDbRewardSetSigners(stackerSetDump, 72)
);

// insert chainhook-payloads dump
const spyInfoLog = jest.spyOn(db.chainhook.logger, 'info').mockImplementation(() => {}); // Surpress noisy logs during bulk insertion test
const payloadDumpFile = './tests/dumps/dump-chainhook-payloads-2024-11-02.ndjson.gz';
const rl = readline.createInterface({
input: fs.createReadStream(payloadDumpFile).pipe(zlib.createGunzip()),
crlfDelay: Infinity,
});
for await (const line of rl) {
await db.chainhook.processPayload(JSON.parse(line) as StacksPayload);
}
rl.close();
spyInfoLog.mockRestore();
});

afterAll(async () => {
Expand Down
Loading

0 comments on commit 40a720c

Please sign in to comment.