Skip to content

Commit

Permalink
fix: process nft and sft mints in batches (#271)
Browse files Browse the repository at this point in the history
* fix: process nft and sft mints in batches

* fix: better batching

* fix: rollbacks
  • Loading branch information
rafaelcr authored Oct 7, 2024
1 parent ae4c71f commit c98f0cd
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 160 deletions.
242 changes: 98 additions & 144 deletions src/pg/chainhook/chainhook-pg-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,7 @@ import {
TokenMetadataUpdateNotification,
} from '../../token-processor/util/sip-validation';
import { ContractNotFoundError } from '../errors';
import {
DbJob,
DbSipNumber,
DbSmartContractInsert,
DbTokenInsert,
DbTokenType,
DbSmartContract,
} from '../types';
import { DbJob, DbSipNumber, DbSmartContractInsert, DbTokenType, DbSmartContract } from '../types';
import { BlockCache, CachedEvent } from './block-cache';
import { dbSipNumberToDbTokenType } from '../../token-processor/util/helpers';
import BigNumber from 'bignumber.js';
Expand Down Expand Up @@ -66,13 +59,16 @@ export class ChainhookPgStore extends BasePgStoreModule {

/**
* Inserts new tokens and new token queue entries until `token_count` items are created, usually
* used when processing an NFT contract.
* used when processing an NFT contract that has just been deployed.
*/
async insertAndEnqueueSequentialTokens(args: {
smart_contract: DbSmartContract;
token_count: bigint;
}): Promise<void> {
const tokenValues: DbTokenInsert[] = [];
async insertAndEnqueueSequentialTokens(
sql: PgSqlClient,
args: {
smart_contract: DbSmartContract;
token_count: bigint;
}
): Promise<void> {
const tokenValues = [];
for (let index = 1; index <= args.token_count; index++)
tokenValues.push({
smart_contract_id: args.smart_contract.id,
Expand All @@ -83,7 +79,25 @@ export class ChainhookPgStore extends BasePgStoreModule {
tx_id: args.smart_contract.tx_id,
tx_index: args.smart_contract.tx_index,
});
return this.insertAndEnqueueTokens(tokenValues);
for await (const batch of batchIterate(tokenValues, 500)) {
await sql`
WITH token_inserts AS (
INSERT INTO tokens ${sql(batch)}
ON CONFLICT ON CONSTRAINT tokens_smart_contract_id_token_number_unique DO
UPDATE SET
uri = EXCLUDED.uri,
name = EXCLUDED.name,
symbol = EXCLUDED.symbol,
decimals = EXCLUDED.decimals,
total_supply = EXCLUDED.total_supply,
updated_at = NOW()
RETURNING id
)
INSERT INTO jobs (token_id) (SELECT id AS token_id FROM token_inserts)
ON CONFLICT (token_id) WHERE smart_contract_id IS NULL DO
UPDATE SET updated_at = NOW(), status = 'pending'
`;
}
}

async applyContractDeployment(
Expand Down Expand Up @@ -150,8 +164,8 @@ export class ChainhookPgStore extends BasePgStoreModule {
await this.applyContractDeployment(sql, contract, cache);
for (const notification of cache.notifications)
await this.applyNotification(sql, notification, cache);
for (const mint of cache.nftMints) await this.applyNftMint(sql, mint, cache);
for (const mint of cache.sftMints) await this.applySftMint(sql, mint, cache);
await this.applyTokenMints(sql, cache.nftMints, DbTokenType.nft, cache);
await this.applyTokenMints(sql, cache.sftMints, DbTokenType.sft, cache);
for (const [contract, delta] of cache.ftSupplyDelta)
await this.applyFtSupplyChange(sql, contract, delta, cache);
}
Expand All @@ -161,8 +175,8 @@ export class ChainhookPgStore extends BasePgStoreModule {
await this.rollBackContractDeployment(sql, contract, cache);
for (const notification of cache.notifications)
await this.rollBackNotification(sql, notification, cache);
for (const mint of cache.nftMints) await this.rollBackNftMint(sql, mint, cache);
for (const mint of cache.sftMints) await this.rollBackSftMint(sql, mint, cache);
await this.rollBackTokenMints(sql, cache.nftMints, DbTokenType.nft, cache);
await this.rollBackTokenMints(sql, cache.sftMints, DbTokenType.sft, cache);
for (const [contract, delta] of cache.ftSupplyDelta)
await this.applyFtSupplyChange(sql, contract, delta.negated(), cache);
}
Expand Down Expand Up @@ -223,68 +237,6 @@ export class ChainhookPgStore extends BasePgStoreModule {
);
}

private async applyNftMint(
sql: PgSqlClient,
mint: CachedEvent<NftMintEvent>,
cache: BlockCache
): Promise<void> {
try {
await this.insertAndEnqueueTokens([
{
smart_contract_id: await this.findSmartContractId(
mint.event.contractId,
DbSipNumber.sip009
),
type: DbTokenType.nft,
token_number: mint.event.tokenId.toString(),
block_height: cache.block.index,
index_block_hash: cache.block.hash,
tx_id: mint.tx_id,
tx_index: mint.tx_index,
},
]);
logger.info(
`ChainhookPgStore apply NFT mint ${mint.event.contractId} (${mint.event.tokenId}) at block ${cache.block.index}`
);
} catch (error) {
if (error instanceof ContractNotFoundError)
logger.warn(
`ChainhookPgStore found NFT mint for nonexisting contract ${mint.event.contractId}`
);
else throw error;
}
}

private async applySftMint(
sql: PgSqlClient,
mint: CachedEvent<SftMintEvent>,
cache: BlockCache
): Promise<void> {
try {
await this.insertAndEnqueueTokens([
{
smart_contract_id: await this.findSmartContractId(
mint.event.contractId,
DbSipNumber.sip013
),
type: DbTokenType.sft,
token_number: mint.event.tokenId.toString(),
block_height: cache.block.index,
index_block_hash: cache.block.hash,
tx_id: mint.tx_id,
tx_index: mint.tx_index,
},
]);
logger.info(
`ChainhookPgStore apply SFT mint ${mint.event.contractId} (${mint.event.tokenId}) at block ${cache.block.index}`
);
} catch (error) {
if (error instanceof ContractNotFoundError)
logger.warn(error, `ChainhookPgStore found SFT mint for nonexisting contract`);
else throw error;
}
}

private async applyFtSupplyChange(
sql: PgSqlClient,
contract: string,
Expand Down Expand Up @@ -333,64 +285,6 @@ export class ChainhookPgStore extends BasePgStoreModule {
);
}

private async rollBackNftMint(
sql: PgSqlClient,
mint: CachedEvent<NftMintEvent>,
cache: BlockCache
): Promise<void> {
try {
const smart_contract_id = await this.findSmartContractId(
mint.event.contractId,
DbSipNumber.sip009
);
await sql`
DELETE FROM tokens
WHERE smart_contract_id = ${smart_contract_id} AND token_number = ${mint.event.tokenId}
`;
logger.info(
`ChainhookPgStore rollback NFT mint ${mint.event.contractId} (${mint.event.tokenId}) at block ${cache.block.index}`
);
} catch (error) {
if (error instanceof ContractNotFoundError)
logger.warn(error, `ChainhookPgStore found NFT mint for nonexisting contract`);
else throw error;
}
}

private async rollBackSftMint(
sql: PgSqlClient,
mint: CachedEvent<SftMintEvent>,
cache: BlockCache
): Promise<void> {
try {
const smart_contract_id = await this.findSmartContractId(
mint.event.contractId,
DbSipNumber.sip013
);
await sql`
DELETE FROM tokens
WHERE smart_contract_id = ${smart_contract_id} AND token_number = ${mint.event.tokenId}
`;
logger.info(
`ChainhookPgStore rollback SFT mint ${mint.event.contractId} (${mint.event.tokenId}) at block ${cache.block.index}`
);
} catch (error) {
if (error instanceof ContractNotFoundError)
logger.warn(error, `ChainhookPgStore found SFT mint for nonexisting contract`);
else throw error;
}
}

private async findSmartContractId(principal: string, sip: DbSipNumber): Promise<number> {
const result = await this.sql<{ id: number }[]>`
SELECT id
FROM smart_contracts
WHERE principal = ${principal} AND sip = ${sip}
`;
if (result.count) return result[0].id;
throw new ContractNotFoundError();
}

private async enqueueDynamicTokensDueForRefresh(): Promise<void> {
const interval = ENV.METADATA_DYNAMIC_TOKEN_REFRESH_INTERVAL.toString();
await this.sql`
Expand Down Expand Up @@ -420,11 +314,42 @@ export class ChainhookPgStore extends BasePgStoreModule {
`;
}

private async insertAndEnqueueTokens(tokenValues: DbTokenInsert[]): Promise<void> {
for await (const batch of batchIterate(tokenValues, 500)) {
await this.sql<DbJob[]>`
WITH token_inserts AS (
INSERT INTO tokens ${this.sql(batch)}
private async applyTokenMints(
sql: PgSqlClient,
mints: CachedEvent<NftMintEvent>[],
tokenType: DbTokenType,
cache: BlockCache
): Promise<void> {
if (mints.length == 0) return;
for await (const batch of batchIterate(mints, 500)) {
const values = batch.map(m => {
logger.info(
`ChainhookPgStore apply ${tokenType.toUpperCase()} mint ${m.event.contractId} (${
m.event.tokenId
}) at block ${cache.block.index}`
);
return [
m.event.contractId,
tokenType,
m.event.tokenId.toString(),
cache.block.index,
cache.block.hash,
m.tx_id,
m.tx_index,
];
});
await sql`
WITH insert_values (principal, type, token_number, block_height, index_block_hash, tx_id,
tx_index) AS (VALUES ${sql(values)}),
filtered_values AS (
SELECT s.id AS smart_contract_id, i.type::token_type, i.token_number::bigint,
i.block_height::bigint, i.index_block_hash::text, i.tx_id::text, i.tx_index::int
FROM insert_values AS i
INNER JOIN smart_contracts AS s ON s.principal = i.principal::text
),
token_inserts AS (
INSERT INTO tokens (smart_contract_id, type, token_number, block_height, index_block_hash,
tx_id, tx_index) (SELECT * FROM filtered_values)
ON CONFLICT ON CONSTRAINT tokens_smart_contract_id_token_number_unique DO
UPDATE SET
uri = EXCLUDED.uri,
Expand All @@ -441,4 +366,33 @@ export class ChainhookPgStore extends BasePgStoreModule {
`;
}
}

private async rollBackTokenMints(
sql: PgSqlClient,
mints: CachedEvent<NftMintEvent>[],
tokenType: DbTokenType,
cache: BlockCache
): Promise<void> {
if (mints.length == 0) return;
for await (const batch of batchIterate(mints, 500)) {
const values = batch.map(m => {
logger.info(
`ChainhookPgStore rollback ${tokenType.toUpperCase()} mint ${m.event.contractId} (${
m.event.tokenId
}) at block ${cache.block.index}`
);
return [m.event.contractId, m.event.tokenId.toString()];
});
await sql`
WITH delete_values (principal, token_number) AS (VALUES ${sql(values)})
DELETE FROM tokens WHERE id IN (
SELECT t.id
FROM delete_values AS d
INNER JOIN smart_contracts AS s ON s.principal = d.principal::text
INNER JOIN tokens AS t
ON t.smart_contract_id = s.id AND t.token_number = d.token_number::bigint
)
`;
}
}
}
12 changes: 1 addition & 11 deletions src/pg/types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { PgJsonb, PgNumeric } from '@hirosystems/api-toolkit';
import { PgJsonb, PgNumeric, PgSqlQuery } from '@hirosystems/api-toolkit';
import { FtOrderBy, Order } from '../api/schemas';

export enum DbSipNumber {
Expand Down Expand Up @@ -69,16 +69,6 @@ export type DbSmartContract = {
non_fungible_token_name?: string;
};

export type DbTokenInsert = {
smart_contract_id: number;
type: DbTokenType;
token_number: PgNumeric;
block_height: number;
index_block_hash: string;
tx_id: string;
tx_index: number;
};

export type DbToken = {
id: number;
smart_contract_id: number;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ export class ProcessSmartContractJob extends Job {
`ProcessSmartContractJob enqueueing ${tokenCount} tokens for ${this.description()}`
);
await this.db.updateSmartContractTokenCount({ id: contract.id, count: tokenCount });
await this.db.chainhook.insertAndEnqueueSequentialTokens({
await this.db.chainhook.insertAndEnqueueSequentialTokens(sql, {
smart_contract: contract,
token_count: tokenCount,
});
Expand Down
4 changes: 1 addition & 3 deletions src/token-processor/util/sip-validation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -375,9 +375,7 @@ export type NftMintEvent = {
tokenId: bigint;
};

export type SftMintEvent = {
contractId: string;
tokenId: bigint;
export type SftMintEvent = NftMintEvent & {
amount: bigint;
recipient: string;
};
Expand Down
26 changes: 26 additions & 0 deletions tests/chainhook/nft-events.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,32 @@ describe('NFT events', () => {
await expect(db.getToken({ id: 1 })).resolves.not.toBeUndefined();
});

test('NFT mint is ignored if contract does not exist', async () => {
const address = 'SP1K1A1PMGW2ZJCNF46NWZWHG8TS1D23EGH1KNK60';
const contractId = `${address}.friedger-pool-nft`;

await db.chainhook.processPayload(
new TestChainhookPayloadBuilder()
.apply()
.block({ height: 100 })
.transaction({ hash: '0x01', sender: address })
.event({
type: 'NFTMintEvent',
position: { index: 0 },
data: {
asset_identifier: `${contractId}::crashpunks-v2`,
recipient: address,
raw_value: cvToHex(uintCV(1)),
},
})
.build()
);

const jobs = await db.getPendingJobBatch({ limit: 1 });
expect(jobs).toHaveLength(0);
await expect(db.getToken({ id: 1 })).resolves.toBeUndefined();
});

test('NFT mint roll back removes token', async () => {
const address = 'SP1K1A1PMGW2ZJCNF46NWZWHG8TS1D23EGH1KNK60';
const contractId = `${address}.friedger-pool-nft`;
Expand Down
2 changes: 1 addition & 1 deletion tests/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1436,7 +1436,7 @@ export async function insertAndEnqueueTestContractWithTokens(
return await db.sqlWriteTransaction(async sql => {
await insertAndEnqueueTestContract(db, principal, sip, tx_id);
const smart_contract = (await db.getSmartContract({ principal })) as DbSmartContract;
await db.chainhook.insertAndEnqueueSequentialTokens({
await db.chainhook.insertAndEnqueueSequentialTokens(sql, {
smart_contract,
token_count,
});
Expand Down

0 comments on commit c98f0cd

Please sign in to comment.