diff --git a/src/pg/chainhook/chainhook-pg-store.ts b/src/pg/chainhook/chainhook-pg-store.ts index 86cb5087..1cfc0a52 100644 --- a/src/pg/chainhook/chainhook-pg-store.ts +++ b/src/pg/chainhook/chainhook-pg-store.ts @@ -14,14 +14,7 @@ import { TokenMetadataUpdateNotification, } from '../../token-processor/util/sip-validation'; import { ContractNotFoundError } from '../errors'; -import { - DbJob, - DbSipNumber, - DbSmartContractInsert, - DbTokenInsert, - DbTokenType, - DbSmartContract, -} from '../types'; +import { DbJob, DbSipNumber, DbSmartContractInsert, DbTokenType, DbSmartContract } from '../types'; import { BlockCache, CachedEvent } from './block-cache'; import { dbSipNumberToDbTokenType } from '../../token-processor/util/helpers'; import BigNumber from 'bignumber.js'; @@ -66,13 +59,16 @@ export class ChainhookPgStore extends BasePgStoreModule { /** * Inserts new tokens and new token queue entries until `token_count` items are created, usually - * used when processing an NFT contract. + * used when processing an NFT contract that has just been deployed. */ - async insertAndEnqueueSequentialTokens(args: { - smart_contract: DbSmartContract; - token_count: bigint; - }): Promise { - const tokenValues: DbTokenInsert[] = []; + async insertAndEnqueueSequentialTokens( + sql: PgSqlClient, + args: { + smart_contract: DbSmartContract; + token_count: bigint; + } + ): Promise { + const tokenValues = []; for (let index = 1; index <= args.token_count; index++) tokenValues.push({ smart_contract_id: args.smart_contract.id, @@ -83,7 +79,25 @@ export class ChainhookPgStore extends BasePgStoreModule { tx_id: args.smart_contract.tx_id, tx_index: args.smart_contract.tx_index, }); - return this.insertAndEnqueueTokens(tokenValues); + for await (const batch of batchIterate(tokenValues, 500)) { + await sql` + WITH token_inserts AS ( + INSERT INTO tokens ${sql(batch)} + ON CONFLICT ON CONSTRAINT tokens_smart_contract_id_token_number_unique DO + UPDATE SET + uri = EXCLUDED.uri, + name = EXCLUDED.name, + symbol = EXCLUDED.symbol, + decimals = EXCLUDED.decimals, + total_supply = EXCLUDED.total_supply, + updated_at = NOW() + RETURNING id + ) + INSERT INTO jobs (token_id) (SELECT id AS token_id FROM token_inserts) + ON CONFLICT (token_id) WHERE smart_contract_id IS NULL DO + UPDATE SET updated_at = NOW(), status = 'pending' + `; + } } async applyContractDeployment( @@ -150,8 +164,8 @@ export class ChainhookPgStore extends BasePgStoreModule { await this.applyContractDeployment(sql, contract, cache); for (const notification of cache.notifications) await this.applyNotification(sql, notification, cache); - for (const mint of cache.nftMints) await this.applyNftMint(sql, mint, cache); - for (const mint of cache.sftMints) await this.applySftMint(sql, mint, cache); + await this.applyTokenMints(sql, cache.nftMints, DbTokenType.nft, cache); + await this.applyTokenMints(sql, cache.sftMints, DbTokenType.sft, cache); for (const [contract, delta] of cache.ftSupplyDelta) await this.applyFtSupplyChange(sql, contract, delta, cache); } @@ -161,8 +175,8 @@ export class ChainhookPgStore extends BasePgStoreModule { await this.rollBackContractDeployment(sql, contract, cache); for (const notification of cache.notifications) await this.rollBackNotification(sql, notification, cache); - for (const mint of cache.nftMints) await this.rollBackNftMint(sql, mint, cache); - for (const mint of cache.sftMints) await this.rollBackSftMint(sql, mint, cache); + await this.rollBackTokenMints(sql, cache.nftMints, DbTokenType.nft, cache); + await this.rollBackTokenMints(sql, cache.sftMints, DbTokenType.sft, cache); for (const [contract, delta] of cache.ftSupplyDelta) await this.applyFtSupplyChange(sql, contract, delta.negated(), cache); } @@ -223,68 +237,6 @@ export class ChainhookPgStore extends BasePgStoreModule { ); } - private async applyNftMint( - sql: PgSqlClient, - mint: CachedEvent, - cache: BlockCache - ): Promise { - try { - await this.insertAndEnqueueTokens([ - { - smart_contract_id: await this.findSmartContractId( - mint.event.contractId, - DbSipNumber.sip009 - ), - type: DbTokenType.nft, - token_number: mint.event.tokenId.toString(), - block_height: cache.block.index, - index_block_hash: cache.block.hash, - tx_id: mint.tx_id, - tx_index: mint.tx_index, - }, - ]); - logger.info( - `ChainhookPgStore apply NFT mint ${mint.event.contractId} (${mint.event.tokenId}) at block ${cache.block.index}` - ); - } catch (error) { - if (error instanceof ContractNotFoundError) - logger.warn( - `ChainhookPgStore found NFT mint for nonexisting contract ${mint.event.contractId}` - ); - else throw error; - } - } - - private async applySftMint( - sql: PgSqlClient, - mint: CachedEvent, - cache: BlockCache - ): Promise { - try { - await this.insertAndEnqueueTokens([ - { - smart_contract_id: await this.findSmartContractId( - mint.event.contractId, - DbSipNumber.sip013 - ), - type: DbTokenType.sft, - token_number: mint.event.tokenId.toString(), - block_height: cache.block.index, - index_block_hash: cache.block.hash, - tx_id: mint.tx_id, - tx_index: mint.tx_index, - }, - ]); - logger.info( - `ChainhookPgStore apply SFT mint ${mint.event.contractId} (${mint.event.tokenId}) at block ${cache.block.index}` - ); - } catch (error) { - if (error instanceof ContractNotFoundError) - logger.warn(error, `ChainhookPgStore found SFT mint for nonexisting contract`); - else throw error; - } - } - private async applyFtSupplyChange( sql: PgSqlClient, contract: string, @@ -333,64 +285,6 @@ export class ChainhookPgStore extends BasePgStoreModule { ); } - private async rollBackNftMint( - sql: PgSqlClient, - mint: CachedEvent, - cache: BlockCache - ): Promise { - try { - const smart_contract_id = await this.findSmartContractId( - mint.event.contractId, - DbSipNumber.sip009 - ); - await sql` - DELETE FROM tokens - WHERE smart_contract_id = ${smart_contract_id} AND token_number = ${mint.event.tokenId} - `; - logger.info( - `ChainhookPgStore rollback NFT mint ${mint.event.contractId} (${mint.event.tokenId}) at block ${cache.block.index}` - ); - } catch (error) { - if (error instanceof ContractNotFoundError) - logger.warn(error, `ChainhookPgStore found NFT mint for nonexisting contract`); - else throw error; - } - } - - private async rollBackSftMint( - sql: PgSqlClient, - mint: CachedEvent, - cache: BlockCache - ): Promise { - try { - const smart_contract_id = await this.findSmartContractId( - mint.event.contractId, - DbSipNumber.sip013 - ); - await sql` - DELETE FROM tokens - WHERE smart_contract_id = ${smart_contract_id} AND token_number = ${mint.event.tokenId} - `; - logger.info( - `ChainhookPgStore rollback SFT mint ${mint.event.contractId} (${mint.event.tokenId}) at block ${cache.block.index}` - ); - } catch (error) { - if (error instanceof ContractNotFoundError) - logger.warn(error, `ChainhookPgStore found SFT mint for nonexisting contract`); - else throw error; - } - } - - private async findSmartContractId(principal: string, sip: DbSipNumber): Promise { - const result = await this.sql<{ id: number }[]>` - SELECT id - FROM smart_contracts - WHERE principal = ${principal} AND sip = ${sip} - `; - if (result.count) return result[0].id; - throw new ContractNotFoundError(); - } - private async enqueueDynamicTokensDueForRefresh(): Promise { const interval = ENV.METADATA_DYNAMIC_TOKEN_REFRESH_INTERVAL.toString(); await this.sql` @@ -420,11 +314,42 @@ export class ChainhookPgStore extends BasePgStoreModule { `; } - private async insertAndEnqueueTokens(tokenValues: DbTokenInsert[]): Promise { - for await (const batch of batchIterate(tokenValues, 500)) { - await this.sql` - WITH token_inserts AS ( - INSERT INTO tokens ${this.sql(batch)} + private async applyTokenMints( + sql: PgSqlClient, + mints: CachedEvent[], + tokenType: DbTokenType, + cache: BlockCache + ): Promise { + if (mints.length == 0) return; + for await (const batch of batchIterate(mints, 500)) { + const values = batch.map(m => { + logger.info( + `ChainhookPgStore apply ${tokenType.toUpperCase()} mint ${m.event.contractId} (${ + m.event.tokenId + }) at block ${cache.block.index}` + ); + return [ + m.event.contractId, + tokenType, + m.event.tokenId.toString(), + cache.block.index, + cache.block.hash, + m.tx_id, + m.tx_index, + ]; + }); + await sql` + WITH insert_values (principal, type, token_number, block_height, index_block_hash, tx_id, + tx_index) AS (VALUES ${sql(values)}), + filtered_values AS ( + SELECT s.id AS smart_contract_id, i.type::token_type, i.token_number::bigint, + i.block_height::bigint, i.index_block_hash::text, i.tx_id::text, i.tx_index::int + FROM insert_values AS i + INNER JOIN smart_contracts AS s ON s.principal = i.principal::text + ), + token_inserts AS ( + INSERT INTO tokens (smart_contract_id, type, token_number, block_height, index_block_hash, + tx_id, tx_index) (SELECT * FROM filtered_values) ON CONFLICT ON CONSTRAINT tokens_smart_contract_id_token_number_unique DO UPDATE SET uri = EXCLUDED.uri, @@ -441,4 +366,33 @@ export class ChainhookPgStore extends BasePgStoreModule { `; } } + + private async rollBackTokenMints( + sql: PgSqlClient, + mints: CachedEvent[], + tokenType: DbTokenType, + cache: BlockCache + ): Promise { + if (mints.length == 0) return; + for await (const batch of batchIterate(mints, 500)) { + const values = batch.map(m => { + logger.info( + `ChainhookPgStore rollback ${tokenType.toUpperCase()} mint ${m.event.contractId} (${ + m.event.tokenId + }) at block ${cache.block.index}` + ); + return [m.event.contractId, m.event.tokenId.toString()]; + }); + await sql` + WITH delete_values (principal, token_number) AS (VALUES ${sql(values)}) + DELETE FROM tokens WHERE id IN ( + SELECT t.id + FROM delete_values AS d + INNER JOIN smart_contracts AS s ON s.principal = d.principal::text + INNER JOIN tokens AS t + ON t.smart_contract_id = s.id AND t.token_number = d.token_number::bigint + ) + `; + } + } } diff --git a/src/pg/types.ts b/src/pg/types.ts index 85231cd9..b84f07da 100644 --- a/src/pg/types.ts +++ b/src/pg/types.ts @@ -1,4 +1,4 @@ -import { PgJsonb, PgNumeric } from '@hirosystems/api-toolkit'; +import { PgJsonb, PgNumeric, PgSqlQuery } from '@hirosystems/api-toolkit'; import { FtOrderBy, Order } from '../api/schemas'; export enum DbSipNumber { @@ -69,16 +69,6 @@ export type DbSmartContract = { non_fungible_token_name?: string; }; -export type DbTokenInsert = { - smart_contract_id: number; - type: DbTokenType; - token_number: PgNumeric; - block_height: number; - index_block_hash: string; - tx_id: string; - tx_index: number; -}; - export type DbToken = { id: number; smart_contract_id: number; diff --git a/src/token-processor/queue/job/process-smart-contract-job.ts b/src/token-processor/queue/job/process-smart-contract-job.ts index 7d946414..1aeab0fa 100644 --- a/src/token-processor/queue/job/process-smart-contract-job.ts +++ b/src/token-processor/queue/job/process-smart-contract-job.ts @@ -74,7 +74,7 @@ export class ProcessSmartContractJob extends Job { `ProcessSmartContractJob enqueueing ${tokenCount} tokens for ${this.description()}` ); await this.db.updateSmartContractTokenCount({ id: contract.id, count: tokenCount }); - await this.db.chainhook.insertAndEnqueueSequentialTokens({ + await this.db.chainhook.insertAndEnqueueSequentialTokens(sql, { smart_contract: contract, token_count: tokenCount, }); diff --git a/src/token-processor/util/sip-validation.ts b/src/token-processor/util/sip-validation.ts index 7524a47c..d7198a0d 100644 --- a/src/token-processor/util/sip-validation.ts +++ b/src/token-processor/util/sip-validation.ts @@ -375,9 +375,7 @@ export type NftMintEvent = { tokenId: bigint; }; -export type SftMintEvent = { - contractId: string; - tokenId: bigint; +export type SftMintEvent = NftMintEvent & { amount: bigint; recipient: string; }; diff --git a/tests/chainhook/nft-events.test.ts b/tests/chainhook/nft-events.test.ts index 1e57e8db..0770f37b 100644 --- a/tests/chainhook/nft-events.test.ts +++ b/tests/chainhook/nft-events.test.ts @@ -92,6 +92,32 @@ describe('NFT events', () => { await expect(db.getToken({ id: 1 })).resolves.not.toBeUndefined(); }); + test('NFT mint is ignored if contract does not exist', async () => { + const address = 'SP1K1A1PMGW2ZJCNF46NWZWHG8TS1D23EGH1KNK60'; + const contractId = `${address}.friedger-pool-nft`; + + await db.chainhook.processPayload( + new TestChainhookPayloadBuilder() + .apply() + .block({ height: 100 }) + .transaction({ hash: '0x01', sender: address }) + .event({ + type: 'NFTMintEvent', + position: { index: 0 }, + data: { + asset_identifier: `${contractId}::crashpunks-v2`, + recipient: address, + raw_value: cvToHex(uintCV(1)), + }, + }) + .build() + ); + + const jobs = await db.getPendingJobBatch({ limit: 1 }); + expect(jobs).toHaveLength(0); + await expect(db.getToken({ id: 1 })).resolves.toBeUndefined(); + }); + test('NFT mint roll back removes token', async () => { const address = 'SP1K1A1PMGW2ZJCNF46NWZWHG8TS1D23EGH1KNK60'; const contractId = `${address}.friedger-pool-nft`; diff --git a/tests/helpers.ts b/tests/helpers.ts index 44b8dbd2..a116ce4f 100644 --- a/tests/helpers.ts +++ b/tests/helpers.ts @@ -1436,7 +1436,7 @@ export async function insertAndEnqueueTestContractWithTokens( return await db.sqlWriteTransaction(async sql => { await insertAndEnqueueTestContract(db, principal, sip, tx_id); const smart_contract = (await db.getSmartContract({ principal })) as DbSmartContract; - await db.chainhook.insertAndEnqueueSequentialTokens({ + await db.chainhook.insertAndEnqueueSequentialTokens(sql, { smart_contract, token_count, });