Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/dupe msg handling #14

Merged
merged 6 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions migrations/1729684505755_block_proposals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,8 @@ export function up(pgm: MigrationBuilder): void {
pgm.createIndex('block_proposals', ['block_hash']);
pgm.createIndex('block_proposals', ['index_block_hash']);
pgm.createIndex('block_proposals', ['reward_cycle']);

pgm.createConstraint('block_proposals', 'block_proposals_block_hash_unique', {
unique: ['block_hash'],
});
}
4 changes: 4 additions & 0 deletions migrations/1729684505756_block_responses.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,8 @@ export function up(pgm: MigrationBuilder): void {
pgm.createIndex('block_responses', ['received_at']);
pgm.createIndex('block_responses', ['signer_sighash']);
pgm.createIndex('block_responses', ['accepted']);

pgm.createConstraint('block_responses', 'block_responses_signer_key_sighash_unique', {
unique: ['signer_key', 'signer_sighash'],
});
}
4 changes: 4 additions & 0 deletions migrations/1729684505758_mock_proposals.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,8 @@ export function up(pgm: MigrationBuilder): void {
pgm.createIndex('mock_proposals', ['stacks_tip']);
pgm.createIndex('mock_proposals', ['index_block_hash']);
pgm.createIndex('mock_proposals', ['burn_block_height']);

pgm.createConstraint('mock_proposals', 'mock_proposals_idb_unique', {
unique: ['index_block_hash'],
});
}
4 changes: 4 additions & 0 deletions migrations/1729684505759_mock_signature.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,8 @@ export function up(pgm: MigrationBuilder): void {
pgm.createIndex('mock_signatures', ['stacks_tip']);
pgm.createIndex('mock_signatures', ['index_block_hash']);
pgm.createIndex('mock_signatures', ['burn_block_height']);

pgm.createConstraint('mock_signatures', 'mock_signatures_signer_key_idb_unique', {
unique: ['signer_key', 'index_block_hash'],
});
}
8 changes: 8 additions & 0 deletions migrations/1729684505760_mock_blocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ export function up(pgm: MigrationBuilder): void {
pgm.createIndex('mock_blocks', ['index_block_hash']);
pgm.createIndex('mock_blocks', ['burn_block_height']);

pgm.createConstraint('mock_blocks', 'mock_blocks_idb_unique', {
unique: ['index_block_hash'],
});

// Mock block signer signatures
pgm.createTable('mock_block_signer_signatures', {
id: {
Expand Down Expand Up @@ -99,4 +103,8 @@ export function up(pgm: MigrationBuilder): void {
pgm.createIndex('mock_block_signer_signatures', ['stacks_tip']);
pgm.createIndex('mock_block_signer_signatures', ['stacks_tip_height']);
pgm.createIndex('mock_block_signer_signatures', ['index_block_hash']);

pgm.createConstraint('mock_block_signer_signatures', 'mock_block_signers_idb_unique', {
unique: ['index_block_hash', 'signer_key'],
});
}
47 changes: 40 additions & 7 deletions src/pg/chainhook/chainhook-pg-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,18 @@ export class ChainhookPgStore extends BasePgStoreModule {
network_id: messageData.mock_proposal.peer_info.network_id,
index_block_hash: normalizeHexString(messageData.mock_proposal.peer_info.index_block_hash),
};
await sql`
const result = await sql`
INSERT INTO mock_blocks ${sql(dbMockBlock)}
ON CONFLICT ON CONSTRAINT mock_blocks_idb_unique DO NOTHING
`;

if (result.count === 0) {
logger.info(
`Skipped inserting duplicate mock block height=${dbMockBlock.stacks_tip_height}, hash=${dbMockBlock.stacks_tip}`
);
return;
}

for (const batch of batchIterate(messageData.mock_signatures, 500)) {
const sigs = batch.map(sig => {
const dbSig: DbMockBlockSignerSignature = {
Expand Down Expand Up @@ -235,9 +243,15 @@ export class ChainhookPgStore extends BasePgStoreModule {
// Metadata fields
metadata_server_version: messageData.metadata.server_version,
};
await sql`
const result = await sql`
INSERT INTO mock_signatures ${sql(dbMockSignature)}
ON CONFLICT ON CONSTRAINT mock_signatures_signer_key_idb_unique DO NOTHING
`;
if (result.count === 0) {
logger.info(
`Skipped inserting duplicate mock signature height=${dbMockSignature.stacks_tip_height}, hash=${dbMockSignature.stacks_tip}, signer=${dbMockSignature.signer_key}`
);
}
}

private async applyMockProposal(
Expand All @@ -258,9 +272,15 @@ export class ChainhookPgStore extends BasePgStoreModule {
network_id: messageData.network_id,
index_block_hash: normalizeHexString(messageData.index_block_hash),
};
await sql`
const result = await sql`
INSERT INTO mock_proposals ${sql(dbMockProposal)}
ON CONFLICT ON CONSTRAINT mock_proposals_idb_unique DO NOTHING
`;
if (result.count === 0) {
logger.info(
`Skipped inserting duplicate mock proposal height=${dbMockProposal.stacks_tip_height}, hash=${dbMockProposal.stacks_tip}`
);
}
}

private async applyBlockProposal(
Expand All @@ -279,9 +299,15 @@ export class ChainhookPgStore extends BasePgStoreModule {
reward_cycle: messageData.reward_cycle,
burn_block_height: messageData.burn_height,
};
await sql`
const result = await sql`
INSERT INTO block_proposals ${sql(dbBlockProposal)}
ON CONFLICT ON CONSTRAINT block_proposals_block_hash_unique DO NOTHING
`;
if (result.count === 0) {
logger.info(
`Skipped inserting duplicate block proposal height=${dbBlockProposal.block_height}, hash=${dbBlockProposal.block_hash}`
);
}
}

private async applyBlockResponse(
Expand All @@ -307,7 +333,7 @@ export class ChainhookPgStore extends BasePgStoreModule {
}
}

const dbBlockProposal: DbBlockResponse = {
const dbBlockResponse: DbBlockResponse = {
received_at: unixTimeMillisecondsToISO(receivedAt),
signer_key: normalizeHexString(signerPubkey),
accepted: accepted,
Expand All @@ -319,9 +345,16 @@ export class ChainhookPgStore extends BasePgStoreModule {
reject_code: rejectCode,
chain_id: accepted ? null : messageData.data.chain_id,
};
await sql`
INSERT INTO block_responses ${sql(dbBlockProposal)}
const result = await sql`
INSERT INTO block_responses ${sql(dbBlockResponse)}
ON CONFLICT ON CONSTRAINT block_responses_signer_key_sighash_unique DO NOTHING
`;

if (result.count === 0) {
logger.info(
`Skipped inserting duplicate block response signer=${dbBlockResponse.signer_key}, hash=${dbBlockResponse.signer_sighash}`
);
}
}

private async updateStacksBlock(
Expand Down
72 changes: 17 additions & 55 deletions src/pg/pg-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,6 @@ export class PgStore extends BasePgStore {
// The `blocks` table (and its associated block_signer_signatures table) is the source of truth that is
// never missing blocks and does not contain duplicate rows per block.
//
// The block_proposals and block_responses tables can have duplicate rows. Duplicates can be detected in
// block_proposals using the block_hash column. Duplicates can be detected in block_responses by looking
// at (signer_key, signer_sighash). For both tables filter duplicates by using only the first row (the
// oldest id column).
//
// Each block has a known set of signer_keys which can be determined by first looking up the block's
// cycle_number from the `block_proposals` table matching on block_hash, then using cycle_number to look
// up the set of signer_keys from the reward_set_signers table (matching cycle_number with reward_cycle).
Expand Down Expand Up @@ -154,16 +149,6 @@ export class PgStore extends BasePgStore {
LIMIT ${limit}
OFFSET ${offset}
),
filtered_block_proposals AS (
SELECT DISTINCT ON (block_hash) id, block_hash, received_at, reward_cycle AS cycle_number
FROM block_proposals
ORDER BY block_hash, id
),
filtered_block_responses AS (
SELECT DISTINCT ON (signer_key, signer_sighash) *
FROM block_responses
ORDER BY signer_key, signer_sighash, id
),
block_signers AS (
SELECT
lb.id AS block_id,
Expand All @@ -172,7 +157,7 @@ export class PgStore extends BasePgStore {
lb.block_hash,
lb.index_block_hash,
lb.burn_block_height,
bp.cycle_number,
bp.reward_cycle as cycle_number,
bp.received_at AS block_proposal_time_ms,
rs.signer_key,
COALESCE(rs.signer_weight, 0) AS signer_weight,
Expand All @@ -185,10 +170,10 @@ export class PgStore extends BasePgStore {
END AS signer_status,
EXTRACT(MILLISECOND FROM (fbr.received_at - bp.received_at)) AS response_time_ms
FROM latest_blocks lb
LEFT JOIN filtered_block_proposals bp ON lb.block_hash = bp.block_hash
LEFT JOIN reward_set_signers rs ON bp.cycle_number = rs.cycle_number
LEFT JOIN block_proposals bp ON lb.block_hash = bp.block_hash
LEFT JOIN reward_set_signers rs ON bp.reward_cycle = rs.cycle_number
LEFT JOIN block_signer_signatures bss ON lb.block_height = bss.block_height AND rs.signer_key = bss.signer_key
LEFT JOIN filtered_block_responses fbr ON fbr.signer_key = rs.signer_key AND fbr.signer_sighash = lb.block_hash
LEFT JOIN block_responses fbr ON fbr.signer_key = rs.signer_key AND fbr.signer_sighash = lb.block_hash
),
signer_state_aggregation AS (
SELECT
Expand Down Expand Up @@ -282,36 +267,24 @@ export class PgStore extends BasePgStore {
WHERE rss.cycle_number = ${cycleNumber}
),
proposal_data AS (
-- Fetch the first (oldest) proposal for each block_hash for the given cycle
-- Select all proposals for the given cycle
SELECT
bp.block_hash,
bp.block_height,
bp.received_at AS proposal_received_at
FROM block_proposals bp
WHERE bp.reward_cycle = ${cycleNumber}
AND bp.id = (
-- Select the earliest proposal for each block_hash
SELECT MIN(sub_bp.id)
FROM block_proposals sub_bp
WHERE sub_bp.block_hash = bp.block_hash
)
),
response_data AS (
-- Fetch the first (oldest) response for each (signer_key, signer_sighash) pair
SELECT DISTINCT ON (br.signer_key, br.signer_sighash)
-- Select responses associated with the proposals from the given cycle
SELECT
br.signer_key,
br.signer_sighash,
br.accepted,
br.received_at,
br.id
FROM block_responses br
WHERE br.id = (
-- Select the earliest response for each signer_sighash and signer_key
SELECT MIN(sub_br.id)
FROM block_responses sub_br
WHERE sub_br.signer_key = br.signer_key
AND sub_br.signer_sighash = br.signer_sighash
)
JOIN proposal_data pd ON br.signer_sighash = pd.block_hash -- Only responses linked to selected proposals
),
signer_proposal_data AS (
-- Cross join signers with proposals and left join filtered responses
Expand Down Expand Up @@ -373,7 +346,7 @@ export class PgStore extends BasePgStore {
}[]
>`
WITH signer_data AS (
-- Fetch the signer for the given cycle
-- Fetch the specific signer for the given cycle
SELECT
rss.signer_key,
rss.signer_weight,
Expand All @@ -383,39 +356,28 @@ export class PgStore extends BasePgStore {
AND rss.signer_key = ${normalizeHexString(signerId)}
),
proposal_data AS (
-- Fetch the first (oldest) proposal for each block_hash for the given cycle
-- Select all proposals for the given cycle
SELECT
bp.block_hash,
bp.block_height,
bp.received_at AS proposal_received_at
FROM block_proposals bp
WHERE bp.reward_cycle = ${cycleNumber}
AND bp.id = (
-- Select the earliest proposal for each block_hash
SELECT MIN(sub_bp.id)
FROM block_proposals sub_bp
WHERE sub_bp.block_hash = bp.block_hash
)
),
response_data AS (
-- Fetch the first (oldest) response for each (signer_key, signer_sighash) pair
SELECT DISTINCT ON (br.signer_key, br.signer_sighash)
-- Select all responses for the proposals in the given cycle
SELECT
br.signer_key,
br.signer_sighash,
br.accepted,
br.received_at,
br.id
FROM block_responses br
WHERE br.id = (
-- Select the earliest response for each signer_sighash and signer_key
SELECT MIN(sub_br.id)
FROM block_responses sub_br
WHERE sub_br.signer_key = br.signer_key
AND sub_br.signer_sighash = br.signer_sighash
)
JOIN proposal_data pd ON br.signer_sighash = pd.block_hash
WHERE br.signer_key = ${normalizeHexString(signerId)} -- Filter for the specific signer
),
signer_proposal_data AS (
-- Cross join signers with proposals and left join filtered responses
-- Cross join the specific signer with proposals and left join filtered responses
SELECT
sd.signer_key,
pd.block_hash,
Expand All @@ -424,13 +386,13 @@ export class PgStore extends BasePgStore {
rd.received_at AS response_received_at,
EXTRACT(MILLISECOND FROM (rd.received_at - pd.proposal_received_at)) AS response_time_ms
FROM signer_data sd
CROSS JOIN proposal_data pd -- Cross join to associate all signers with all proposals
CROSS JOIN proposal_data pd
LEFT JOIN response_data rd
ON pd.block_hash = rd.signer_sighash
AND sd.signer_key = rd.signer_key -- Match signers with their corresponding responses
),
aggregated_data AS (
-- Aggregate the proposal and response data by signer
-- Aggregate the proposal and response data for the specific signer
SELECT
spd.signer_key,
COUNT(CASE WHEN spd.accepted = true THEN 1 END)::integer AS proposals_accepted_count,
Expand Down
Loading