From 79334e23d1ab0d5b27b3df90c1160b487f498026 Mon Sep 17 00:00:00 2001 From: Matthew Little Date: Fri, 15 Nov 2024 17:34:42 +0100 Subject: [PATCH] feat: socket-io blockProposal notifications (#58) * feat: socket-io notifications * test: socket-io blockProposal tests * chore: no client lib for now * chore: fix ci --- jest.config.ts | 1 + package-lock.json | 266 ++++++++++++++++++- package.json | 4 +- src/api/init.ts | 2 + src/api/routes/block-proposals.ts | 94 +++---- src/api/routes/socket-io.ts | 84 ++++++ src/pg/chainhook/chainhook-pg-store.ts | 112 ++++++-- src/pg/notifications/pg-notifications.ts | 66 +++++ src/pg/pg-store.ts | 4 + src/pg/types.ts | 25 +- src/stacks-core-rpc/stacker-set-updater.ts | 2 +- tests/db/db-notifications.test.ts | 124 +++++++++ tests/db/duplicate-signer-set-insert.test.ts | 1 + tests/db/endpoints.test.ts | 1 + tests/db/ingestion.test.ts | 1 + tests/db/jest-global-teardown.ts | 2 +- 16 files changed, 711 insertions(+), 78 deletions(-) create mode 100644 src/api/routes/socket-io.ts create mode 100644 src/pg/notifications/pg-notifications.ts create mode 100644 tests/db/db-notifications.test.ts diff --git a/jest.config.ts b/jest.config.ts index 6a34425..46a6802 100644 --- a/jest.config.ts +++ b/jest.config.ts @@ -5,6 +5,7 @@ const jestConfig: JestConfigWithTsJest = { testEnvironment: 'node', coverageProvider: 'v8', collectCoverageFrom: ['src/**/*.ts', 'migrations/*.ts'], + testTimeout: 600000, projects: [ { transform, diff --git a/package-lock.json b/package-lock.json index 4ac5c36..9199e30 100644 --- a/package-lock.json +++ b/package-lock.json @@ -8,6 +8,9 @@ "name": "@hirosystems/signer-metrics-api", "version": "1.0.0", "license": "GPL-3.0", + "workspaces": [ + "client" + ], "dependencies": { "@fastify/cors": "^9.0.1", "@fastify/swagger": "^8.15.0", @@ -25,7 +28,8 @@ "node-pg-migrate": "^6.2.2", "p-queue": "^6.6.2", "pino": "^9.5.0", - "postgres": "^3.3.1" + "postgres": "^3.4.5", + "socket.io": "^4.8.1" }, "devDependencies": { "@commitlint/cli": "^17.4.2", @@ -44,6 +48,7 @@ "jest": "^29.7.0", "prettier": "3.3.3", "rimraf": "^6.0.1", + "socket.io-client": "^4.8.1", "supertest": "^7.0.0", "ts-jest": "^29.2.5", "ts-node": "^10.9.2", @@ -51,6 +56,14 @@ "typescript-eslint": "^8.13.0" } }, + "client": { + "name": "@hirosystems/signer-metrics-api-client", + "version": "0.0.1", + "license": "GPL-3.0", + "dependencies": { + "socket.io-client": "^4.8.1" + } + }, "node_modules/@ampproject/remapping": { "version": "2.3.0", "resolved": "https://registry.npmjs.org/@ampproject/remapping/-/remapping-2.3.0.tgz", @@ -1527,6 +1540,10 @@ "real-require": "^0.2.0" } }, + "node_modules/@hirosystems/signer-metrics-api-client": { + "resolved": "client", + "link": true + }, "node_modules/@humanfs/core": { "version": "0.19.1", "resolved": "https://registry.npmjs.org/@humanfs/core/-/core-0.19.1.tgz", @@ -2380,6 +2397,12 @@ "@sinonjs/commons": "^3.0.0" } }, + "node_modules/@socket.io/component-emitter": { + "version": "3.1.2", + "resolved": "https://registry.npmjs.org/@socket.io/component-emitter/-/component-emitter-3.1.2.tgz", + "integrity": "sha512-9BCxFwvbGg/RsZK9tjXd8s4UcwR0MWeFQ1XEKIQVVvAGJyINdrqKMcTRyLoK8Rse1GjzLV9cwjWV1olXRWEXVA==", + "license": "MIT" + }, "node_modules/@stacks/common": { "version": "6.16.0", "resolved": "https://registry.npmjs.org/@stacks/common/-/common-6.16.0.tgz", @@ -2511,6 +2534,12 @@ "@types/node": "*" } }, + "node_modules/@types/cookie": { + "version": "0.4.1", + "resolved": "https://registry.npmjs.org/@types/cookie/-/cookie-0.4.1.tgz", + "integrity": "sha512-XW/Aa8APYr6jSVVA1y/DEIZX0/GMKLEVekNG727R8cs56ahETkRAy/3DR7+fJyh7oUgGwNQaRfXCun0+KbWY7Q==", + "license": "MIT" + }, "node_modules/@types/cookiejar": { "version": "2.1.5", "resolved": "https://registry.npmjs.org/@types/cookiejar/-/cookiejar-2.1.5.tgz", @@ -2518,6 +2547,15 @@ "dev": true, "license": "MIT" }, + "node_modules/@types/cors": { + "version": "2.8.17", + "resolved": "https://registry.npmjs.org/@types/cors/-/cors-2.8.17.tgz", + "integrity": "sha512-8CGDvrBj1zgo2qE+oS3pOCyYNqCPryMWY2bGfwA0dcfopWGgxs+78df0Rs3rc9THP4JkOhLsAa+15VdpAqkcUA==", + "license": "MIT", + "dependencies": { + "@types/node": "*" + } + }, "node_modules/@types/docker-modem": { "version": "3.0.6", "resolved": "https://registry.npmjs.org/@types/docker-modem/-/docker-modem-3.0.6.tgz", @@ -2997,6 +3035,19 @@ "integrity": "sha512-2BjRTZxTPvheOvGbBslFSYOUkr+SjPtOnrLP33f+VIWLzezQpZcqVg7ja3L4dBXmzzgwT+a029jRx5PCi3JuiA==", "license": "MIT" }, + "node_modules/accepts": { + "version": "1.3.8", + "resolved": "https://registry.npmjs.org/accepts/-/accepts-1.3.8.tgz", + "integrity": "sha512-PYAthTa2m2VKxuvSD3DPC/Gy+U+sOA1LAuT8mkmRuvw+NACSaeXEQ+NHcVF7rONl6qcaxV3Uuemwawk+7+SJLw==", + "license": "MIT", + "dependencies": { + "mime-types": "~2.1.34", + "negotiator": "0.6.3" + }, + "engines": { + "node": ">= 0.6" + } + }, "node_modules/acorn": { "version": "8.14.0", "resolved": "https://registry.npmjs.org/acorn/-/acorn-8.14.0.tgz", @@ -3471,6 +3522,15 @@ ], "license": "MIT" }, + "node_modules/base64id": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/base64id/-/base64id-2.0.0.tgz", + "integrity": "sha512-lGe34o6EHj9y3Kts9R4ZYs/Gr+6N7MCaMlIFA3F1R2O5/m7K06AxfSeO5530PEERE6/WyEg3lsuyw4GHlPZHog==", + "license": "MIT", + "engines": { + "node": "^4.5.0 || >= 5.9" + } + }, "node_modules/bcrypt-pbkdf": { "version": "1.0.2", "resolved": "https://registry.npmjs.org/bcrypt-pbkdf/-/bcrypt-pbkdf-1.0.2.tgz", @@ -4102,6 +4162,19 @@ "url": "https://opencollective.com/core-js" } }, + "node_modules/cors": { + "version": "2.8.5", + "resolved": "https://registry.npmjs.org/cors/-/cors-2.8.5.tgz", + "integrity": "sha512-KIHbLJqu73RGr/hnbrO9uBeixNGuvSQjul/jdFvS/KFSIH1hWVd1ng7zOHx+YrEfInLG7q4n6GHQ9cDtxv/P6g==", + "license": "MIT", + "dependencies": { + "object-assign": "^4", + "vary": "^1" + }, + "engines": { + "node": ">= 0.10" + } + }, "node_modules/cosmiconfig": { "version": "8.3.6", "resolved": "https://registry.npmjs.org/cosmiconfig/-/cosmiconfig-8.3.6.tgz", @@ -4744,6 +4817,91 @@ "once": "^1.4.0" } }, + "node_modules/engine.io": { + "version": "6.6.2", + "resolved": "https://registry.npmjs.org/engine.io/-/engine.io-6.6.2.tgz", + "integrity": "sha512-gmNvsYi9C8iErnZdVcJnvCpSKbWTt1E8+JZo8b+daLninywUWi5NQ5STSHZ9rFjFO7imNcvb8Pc5pe/wMR5xEw==", + "license": "MIT", + "dependencies": { + "@types/cookie": "^0.4.1", + "@types/cors": "^2.8.12", + "@types/node": ">=10.0.0", + "accepts": "~1.3.4", + "base64id": "2.0.0", + "cookie": "~0.7.2", + "cors": "~2.8.5", + "debug": "~4.3.1", + "engine.io-parser": "~5.2.1", + "ws": "~8.17.1" + }, + "engines": { + "node": ">=10.2.0" + } + }, + "node_modules/engine.io-client": { + "version": "6.6.2", + "resolved": "https://registry.npmjs.org/engine.io-client/-/engine.io-client-6.6.2.tgz", + "integrity": "sha512-TAr+NKeoVTjEVW8P3iHguO1LO6RlUz9O5Y8o7EY0fU+gY1NYqas7NN3slpFtbXEsLMHk0h90fJMfKjRkQ0qUIw==", + "license": "MIT", + "dependencies": { + "@socket.io/component-emitter": "~3.1.0", + "debug": "~4.3.1", + "engine.io-parser": "~5.2.1", + "ws": "~8.17.1", + "xmlhttprequest-ssl": "~2.1.1" + } + }, + "node_modules/engine.io-client/node_modules/ws": { + "version": "8.17.1", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.17.1.tgz", + "integrity": "sha512-6XQFvXTkbfUOZOKKILFG1PDK2NDQs4azKQl26T0YS5CxqWLgXajbPZ+h4gZekJyRqFU8pvnbAbbs/3TgRPy+GQ==", + "license": "MIT", + "engines": { + "node": ">=10.0.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": ">=5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } + } + }, + "node_modules/engine.io-parser": { + "version": "5.2.3", + "resolved": "https://registry.npmjs.org/engine.io-parser/-/engine.io-parser-5.2.3.tgz", + "integrity": "sha512-HqD3yTBfnBxIrbnM1DoD6Pcq8NECnh8d4As1Qgh0z5Gg3jRRIqijury0CL3ghu/edArpUYiYqQiDUQBIs4np3Q==", + "license": "MIT", + "engines": { + "node": ">=10.0.0" + } + }, + "node_modules/engine.io/node_modules/ws": { + "version": "8.17.1", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.17.1.tgz", + "integrity": "sha512-6XQFvXTkbfUOZOKKILFG1PDK2NDQs4azKQl26T0YS5CxqWLgXajbPZ+h4gZekJyRqFU8pvnbAbbs/3TgRPy+GQ==", + "license": "MIT", + "engines": { + "node": ">=10.0.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": ">=5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } + } + }, "node_modules/entities": { "version": "4.5.0", "resolved": "https://registry.npmjs.org/entities/-/entities-4.5.0.tgz", @@ -8105,7 +8263,6 @@ "version": "1.52.0", "resolved": "https://registry.npmjs.org/mime-db/-/mime-db-1.52.0.tgz", "integrity": "sha512-sPU4uV7dYlvtWJxwwxHD0PuihVNiE7TyAbQ5SWxDCB9mUYvOgroQOwYQQOKPJ8CIbE+1ETVlOoK1UC2nU3gYvg==", - "dev": true, "license": "MIT", "engines": { "node": ">= 0.6" @@ -8115,7 +8272,6 @@ "version": "2.1.35", "resolved": "https://registry.npmjs.org/mime-types/-/mime-types-2.1.35.tgz", "integrity": "sha512-ZDY+bPm5zTTF+YpCrAU9nK0UgICYPT0QtT1NZWFv4s++TNkcgVaT0g6+4R2uI4MjQjzysHB1zxuWL50hzaeXiw==", - "dev": true, "license": "MIT", "dependencies": { "mime-db": "1.52.0" @@ -8355,6 +8511,15 @@ "url": "https://nearley.js.org/#give-to-nearley" } }, + "node_modules/negotiator": { + "version": "0.6.3", + "resolved": "https://registry.npmjs.org/negotiator/-/negotiator-0.6.3.tgz", + "integrity": "sha512-+EUsqGPLsM+j/zdChZjsnX51g4XrHFOIXwfnCVPGlQk/k5giakcKsuxCObBRu6DSm9opw/O6slWbJdghQM4bBg==", + "license": "MIT", + "engines": { + "node": ">= 0.6" + } + }, "node_modules/neo-async": { "version": "2.6.2", "resolved": "https://registry.npmjs.org/neo-async/-/neo-async-2.6.2.tgz", @@ -8652,7 +8817,6 @@ "version": "4.1.1", "resolved": "https://registry.npmjs.org/object-assign/-/object-assign-4.1.1.tgz", "integrity": "sha512-rJgTQnkUnH1sFw8yT6VSU3zD3sWmu6sZhIseY8VX+GRu3P6F7Fu+JNDoXfklElbLJSnc3FUQHVe4cU5hj+BcUg==", - "dev": true, "license": "MIT", "engines": { "node": ">=0.10.0" @@ -10723,6 +10887,83 @@ "node": ">=8.0.0" } }, + "node_modules/socket.io": { + "version": "4.8.1", + "resolved": "https://registry.npmjs.org/socket.io/-/socket.io-4.8.1.tgz", + "integrity": "sha512-oZ7iUCxph8WYRHHcjBEc9unw3adt5CmSNlppj/5Q4k2RIrhl8Z5yY2Xr4j9zj0+wzVZ0bxmYoGSzKJnRl6A4yg==", + "license": "MIT", + "dependencies": { + "accepts": "~1.3.4", + "base64id": "~2.0.0", + "cors": "~2.8.5", + "debug": "~4.3.2", + "engine.io": "~6.6.0", + "socket.io-adapter": "~2.5.2", + "socket.io-parser": "~4.2.4" + }, + "engines": { + "node": ">=10.2.0" + } + }, + "node_modules/socket.io-adapter": { + "version": "2.5.5", + "resolved": "https://registry.npmjs.org/socket.io-adapter/-/socket.io-adapter-2.5.5.tgz", + "integrity": "sha512-eLDQas5dzPgOWCk9GuuJC2lBqItuhKI4uxGgo9aIV7MYbk2h9Q6uULEh8WBzThoI7l+qU9Ast9fVUmkqPP9wYg==", + "license": "MIT", + "dependencies": { + "debug": "~4.3.4", + "ws": "~8.17.1" + } + }, + "node_modules/socket.io-adapter/node_modules/ws": { + "version": "8.17.1", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.17.1.tgz", + "integrity": "sha512-6XQFvXTkbfUOZOKKILFG1PDK2NDQs4azKQl26T0YS5CxqWLgXajbPZ+h4gZekJyRqFU8pvnbAbbs/3TgRPy+GQ==", + "license": "MIT", + "engines": { + "node": ">=10.0.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": ">=5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } + } + }, + "node_modules/socket.io-client": { + "version": "4.8.1", + "resolved": "https://registry.npmjs.org/socket.io-client/-/socket.io-client-4.8.1.tgz", + "integrity": "sha512-hJVXfu3E28NmzGk8o1sHhN3om52tRvwYeidbj7xKy2eIIse5IoKX3USlS6Tqt3BHAtflLIkCQBkzVrEEfWUyYQ==", + "license": "MIT", + "dependencies": { + "@socket.io/component-emitter": "~3.1.0", + "debug": "~4.3.2", + "engine.io-client": "~6.6.1", + "socket.io-parser": "~4.2.4" + }, + "engines": { + "node": ">=10.0.0" + } + }, + "node_modules/socket.io-parser": { + "version": "4.2.4", + "resolved": "https://registry.npmjs.org/socket.io-parser/-/socket.io-parser-4.2.4.tgz", + "integrity": "sha512-/GbIKmo8ioc+NIWIhwdecY0ge+qVBSMdgxGygevmdHj24bsfgtCmcUUcQ5ZzcylGFHsN3k4HB4Cgkl96KVnuew==", + "license": "MIT", + "dependencies": { + "@socket.io/component-emitter": "~3.1.0", + "debug": "~4.3.1" + }, + "engines": { + "node": ">=10.0.0" + } + }, "node_modules/sonic-boom": { "version": "4.2.0", "resolved": "https://registry.npmjs.org/sonic-boom/-/sonic-boom-4.2.0.tgz", @@ -11821,6 +12062,15 @@ "spdx-expression-parse": "^3.0.0" } }, + "node_modules/vary": { + "version": "1.1.2", + "resolved": "https://registry.npmjs.org/vary/-/vary-1.1.2.tgz", + "integrity": "sha512-BNGbWLfd0eUPabhkXUVm0j8uuvREyTh5ovRa/dyow/BqAbZJyC+5fU+IzQOzmAKzYqYRAISoRhdQr3eIZ/PXqg==", + "license": "MIT", + "engines": { + "node": ">= 0.8" + } + }, "node_modules/walker": { "version": "1.0.8", "resolved": "https://registry.npmjs.org/walker/-/walker-1.0.8.tgz", @@ -12021,6 +12271,14 @@ } } }, + "node_modules/xmlhttprequest-ssl": { + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/xmlhttprequest-ssl/-/xmlhttprequest-ssl-2.1.2.tgz", + "integrity": "sha512-TEU+nJVUUnA4CYJFLvK5X9AOeH4KvDvhIfm0vV1GaQRtchnG0hgK5p8hw/xjv8cunWYCsiPCSDzObPyhEwq3KQ==", + "engines": { + "node": ">=0.4.0" + } + }, "node_modules/xtend": { "version": "4.0.2", "resolved": "https://registry.npmjs.org/xtend/-/xtend-4.0.2.tgz", diff --git a/package.json b/package.json index 10a5661..54f6d5f 100644 --- a/package.json +++ b/package.json @@ -44,6 +44,7 @@ "jest": "^29.7.0", "prettier": "3.3.3", "rimraf": "^6.0.1", + "socket.io-client": "^4.8.1", "supertest": "^7.0.0", "ts-jest": "^29.2.5", "ts-node": "^10.9.2", @@ -67,6 +68,7 @@ "node-pg-migrate": "^6.2.2", "p-queue": "^6.6.2", "pino": "^9.5.0", - "postgres": "^3.3.1" + "postgres": "^3.4.5", + "socket.io": "^4.8.1" } } diff --git a/src/api/init.ts b/src/api/init.ts index 51b3a09..d39f5dd 100644 --- a/src/api/init.ts +++ b/src/api/init.ts @@ -11,6 +11,7 @@ import { PINO_LOGGER_CONFIG } from '@hirosystems/api-toolkit'; import { CycleRoutes } from './routes/cycle'; import { BlockRoutes } from './routes/blocks'; import { BlockProposalsRoutes } from './routes/block-proposals'; +import { SocketIORoutes } from './routes/socket-io'; export const Api: FastifyPluginAsync, Server, TypeBoxTypeProvider> = async ( fastify, @@ -22,6 +23,7 @@ export const Api: FastifyPluginAsync, Server, TypeBoxTypePr await fastify.register(CycleRoutes); await fastify.register(BlockRoutes); await fastify.register(BlockProposalsRoutes); + await fastify.register(SocketIORoutes); }, { prefix: '/signer-metrics' } ); diff --git a/src/api/routes/block-proposals.ts b/src/api/routes/block-proposals.ts index dcc7e99..f8b4c4f 100644 --- a/src/api/routes/block-proposals.ts +++ b/src/api/routes/block-proposals.ts @@ -103,56 +103,56 @@ export const BlockProposalsRoutes: FastifyPluginCallback< } ); - function parseDbBlockProposalData(r: DbBlockProposalQueryResponse): BlockProposalsEntry { - const signerData = r.signer_data.map(s => { - const data: BlockProposalSignerData = { - signer_key: s.signer_key, - slot_index: s.slot_index, - response: s.response, - weight: s.weight, - weight_percentage: Number( - BigNumber(s.weight).div(r.total_signer_weight).times(100).toFixed(3) - ), - stacked_amount: s.stacked_amount, - version: s.version, - received_at: s.received_at ? new Date(s.received_at).toISOString() : null, - response_time_ms: s.received_at - ? differenceInMilliseconds(new Date(s.received_at), r.received_at) - : null, - reason_string: s.reason_string, - reason_code: s.reason_code, - reject_code: s.reject_code, - }; - return data; - }); + done(); +}; - const entry: BlockProposalsEntry = { - received_at: r.received_at.toISOString(), - block_height: r.block_height, - block_hash: r.block_hash, - index_block_hash: r.index_block_hash, - burn_block_height: r.burn_block_height, - block_time: r.block_time, - cycle_number: r.cycle_number, - status: r.status, +export function parseDbBlockProposalData(r: DbBlockProposalQueryResponse): BlockProposalsEntry { + const signerData = r.signer_data.map(s => { + const data: BlockProposalSignerData = { + signer_key: s.signer_key, + slot_index: s.slot_index, + response: s.response, + weight: s.weight, + weight_percentage: Number( + BigNumber(s.weight).div(r.total_signer_weight).times(100).toFixed(3) + ), + stacked_amount: s.stacked_amount, + version: s.version, + received_at: s.received_at ? new Date(s.received_at).toISOString() : null, + response_time_ms: s.received_at + ? differenceInMilliseconds(new Date(s.received_at), r.received_at) + : null, + reason_string: s.reason_string, + reason_code: s.reason_code, + reject_code: s.reject_code, + }; + return data; + }); - // cycle data - total_signer_count: r.total_signer_count, - total_signer_weight: r.total_signer_weight, - total_signer_stacked_amount: r.total_signer_stacked_amount, + const entry: BlockProposalsEntry = { + received_at: r.received_at.toISOString(), + block_height: r.block_height, + block_hash: r.block_hash, + index_block_hash: r.index_block_hash, + burn_block_height: r.burn_block_height, + block_time: r.block_time, + cycle_number: r.cycle_number, + status: r.status, - accepted_count: r.accepted_count, - rejected_count: r.rejected_count, - missing_count: r.missing_count, + // cycle data + total_signer_count: r.total_signer_count, + total_signer_weight: r.total_signer_weight, + total_signer_stacked_amount: r.total_signer_stacked_amount, - accepted_weight: r.accepted_weight, - rejected_weight: r.rejected_weight, - missing_weight: r.missing_weight, + accepted_count: r.accepted_count, + rejected_count: r.rejected_count, + missing_count: r.missing_count, - signer_data: signerData, - }; - return entry; - } + accepted_weight: r.accepted_weight, + rejected_weight: r.rejected_weight, + missing_weight: r.missing_weight, - done(); -}; + signer_data: signerData, + }; + return entry; +} diff --git a/src/api/routes/socket-io.ts b/src/api/routes/socket-io.ts new file mode 100644 index 0000000..2be3c33 --- /dev/null +++ b/src/api/routes/socket-io.ts @@ -0,0 +1,84 @@ +import { Server as HttpServer } from 'http'; +import { Namespace, Server } from 'socket.io'; +import { FastifyPluginAsync } from 'fastify'; +import { TypeBoxTypeProvider } from '@fastify/type-provider-typebox'; +import { SignerMessagesEventPayload } from '../../pg/types'; +import { logger } from '@hirosystems/api-toolkit'; +import { parseDbBlockProposalData } from './block-proposals'; +import { BlockProposalsEntry } from '../schemas'; + +// eslint-disable-next-line @typescript-eslint/no-empty-object-type +export interface ClientToServerEvents {} + +export interface ServerToClientEvents { + blockProposal: (arg: BlockProposalsEntry) => void; +} + +// eslint-disable-next-line @typescript-eslint/no-empty-object-type +interface InterServerEvents {} + +// eslint-disable-next-line @typescript-eslint/no-empty-object-type +interface SocketData {} + +type BlockProposalSocketNamespace = Namespace< + ClientToServerEvents, + ServerToClientEvents, + InterServerEvents, + SocketData +>; + +export const SocketIORoutes: FastifyPluginAsync< + Record, + HttpServer, + TypeBoxTypeProvider +> = async (fastify, _options) => { + const db = fastify.db; + const io = new Server(fastify.server, { + path: fastify.prefix + '/socket.io/', + transports: ['websocket', 'polling'], + }); + + const blockProposalNs = io.of('/block-proposals') as BlockProposalSocketNamespace; + + const signerMessageListener = (msg: SignerMessagesEventPayload) => { + if (blockProposalNs.sockets.size === 0) { + return; + } + // Use Set to get a unique list of block hashes + const blockHashes = new Set( + msg.map(m => ('proposal' in m ? m.proposal.blockHash : m.response.blockHash)) + ); + const proposalBroadcasts = Array.from(blockHashes).map(blockHash => { + return db + .sqlTransaction(async sql => { + const results = await db.getBlockProposal({ + sql, + blockHash, + }); + if (results.length > 0) { + const blockProposal = parseDbBlockProposalData(results[0]); + blockProposalNs.emit('blockProposal', blockProposal); + } + }) + .catch((error: unknown) => { + logger.error(error, `Failed to broadcast block proposal for block hash ${blockHash}`); + }); + }); + void Promise.allSettled(proposalBroadcasts); + }; + + fastify.addHook('onListen', () => { + fastify.db.notifications.events.on('signerMessages', signerMessageListener); + }); + + fastify.addHook('preClose', done => { + fastify.db?.notifications.events.off('signerMessages', signerMessageListener); + io.local.disconnectSockets(true); + done(); + }); + fastify.addHook('onClose', async () => { + await io.close(); + }); + + await Promise.resolve(); +}; diff --git a/src/pg/chainhook/chainhook-pg-store.ts b/src/pg/chainhook/chainhook-pg-store.ts index e8ffa86..e94db6c 100644 --- a/src/pg/chainhook/chainhook-pg-store.ts +++ b/src/pg/chainhook/chainhook-pg-store.ts @@ -8,6 +8,8 @@ import { } from '@hirosystems/api-toolkit'; import { StacksEvent, StacksPayload } from '@hirosystems/chainhook-client'; import { + BlockProposalEventArgs, + BlockResponseEventArgs, DbBlock, DbBlockProposal, DbBlockResponse, @@ -17,6 +19,7 @@ import { DbMockProposal, DbMockSignature, DbRewardSetSigner, + SignerMessagesEventPayload, } from '../types'; import { normalizeHexString, unixTimeMillisecondsToISO, unixTimeSecondsToISO } from '../../helpers'; import { EventEmitter } from 'node:events'; @@ -53,8 +56,13 @@ type MockBlockData = Extract< { type: 'MockBlock' } >['data']; +export type DbWriteEvents = EventEmitter<{ + missingStackerSet: [{ cycleNumber: number }]; + signerMessages: [SignerMessagesEventPayload]; +}>; + export class ChainhookPgStore extends BasePgStoreModule { - readonly events = new EventEmitter<{ missingStackerSet: [{ cycleNumber: number }] }>(); + readonly events: DbWriteEvents = new EventEmitter(); readonly logger = defaultLogger.child({ module: 'ChainhookPgStore' }); constructor(db: BasePgStore) { @@ -62,6 +70,8 @@ export class ChainhookPgStore extends BasePgStoreModule { } async processPayload(payload: StacksPayload): Promise { + const appliedSignerMessageResults: SignerMessagesEventPayload = []; + await this.sqlWriteTransaction(async sql => { for (const block of payload.rollback) { this.logger.info(`ChainhookPgStore rollback block ${block.block_identifier.index}`); @@ -97,12 +107,21 @@ export class ChainhookPgStore extends BasePgStoreModule { for (const event of payload.events) { if (event.payload.type === 'SignerMessage') { - await this.applySignerMessageEvent(sql, event); + const applyResults = await this.applySignerMessageEvent(sql, event); + appliedSignerMessageResults.push(...applyResults); } else { this.logger.error(`Unknown chainhook payload event type: ${event.payload.type}`); } } }); + + // After the sql transaction is complete, emit events for the applied signer messages. + // Use setTimeout to break out of the call stack so caller is not blocked by event listeners. + if (appliedSignerMessageResults.length > 0) { + setTimeout(() => { + this.events.emit('signerMessages', appliedSignerMessageResults); + }); + } } async updateChainTipBlockHeight(blockHeight: number): Promise { @@ -114,24 +133,45 @@ export class ChainhookPgStore extends BasePgStoreModule { return result[0].block_height; } - private async applySignerMessageEvent(sql: PgSqlClient, event: SignerMessage) { + private async applySignerMessageEvent( + sql: PgSqlClient, + event: SignerMessage + ): Promise { + const appliedResults: SignerMessagesEventPayload = []; switch (event.payload.data.message.type) { case 'BlockProposal': { - await this.applyBlockProposal( + const res = await this.applyBlockProposal( sql, event.received_at_ms, event.payload.data.pubkey, event.payload.data.message.data ); + if (res.applied) { + appliedResults.push({ + proposal: { + receiptTimestamp: event.received_at_ms, + blockHash: res.blockHash, + }, + }); + } break; } case 'BlockResponse': { - await this.applyBlockResponse( + const res = await this.applyBlockResponse( sql, event.received_at_ms, event.payload.data.pubkey, event.payload.data.message.data ); + if (res.applied) { + appliedResults.push({ + response: { + receiptTimestamp: event.received_at_ms, + blockHash: res.blockHash, + signerKey: res.signerKey, + }, + }); + } break; } case 'BlockPushed': { @@ -171,6 +211,7 @@ export class ChainhookPgStore extends BasePgStoreModule { break; } } + return appliedResults; } private async applyMockBlock( @@ -320,13 +361,14 @@ export class ChainhookPgStore extends BasePgStoreModule { receivedAt: number, minerPubkey: string, messageData: BlockProposalData - ) { + ): Promise<{ applied: false } | { applied: true; blockHash: string }> { + const blockHash = normalizeHexString(messageData.block.block_hash); const dbBlockProposal: DbBlockProposal = { received_at: unixTimeMillisecondsToISO(receivedAt), miner_key: normalizeHexString(minerPubkey), block_height: messageData.block.header.chain_length, block_time: unixTimeSecondsToISO(messageData.block.header.timestamp), - block_hash: normalizeHexString(messageData.block.block_hash), + block_hash: blockHash, index_block_hash: normalizeHexString(messageData.block.index_block_hash), reward_cycle: messageData.reward_cycle, burn_block_height: messageData.burn_height, @@ -339,11 +381,37 @@ export class ChainhookPgStore extends BasePgStoreModule { this.logger.info( `Skipped inserting duplicate block proposal height=${dbBlockProposal.block_height}, hash=${dbBlockProposal.block_hash}` ); - } else { - this.logger.info( - `ChainhookPgStore apply block_proposal height=${dbBlockProposal.block_height}, hash=${dbBlockProposal.block_hash}` - ); + return { applied: false }; } + this.logger.info( + `ChainhookPgStore apply block_proposal height=${dbBlockProposal.block_height}, hash=${dbBlockProposal.block_hash}` + ); + return { applied: true, blockHash }; + } + + async deleteBlockProposal(sql: PgSqlClient, blockHash: string): Promise { + const result = await sql` + DELETE FROM block_proposals WHERE block_hash = ${blockHash} RETURNING * + `; + if (result.length === 0) { + throw new Error(`Block proposal not found for hash ${blockHash}`); + } + // copy the result to a new object to remove the id field + const proposal = { ...result[0], id: undefined }; + delete proposal.id; + return proposal; + } + + async deleteBlockResponses(sql: PgSqlClient, blockHash: string): Promise { + const result = await sql` + DELETE FROM block_responses WHERE signer_sighash = ${blockHash} RETURNING * + `; + // copy the results to a new object to remove the id field + return result.map(r => { + const response = { ...r, id: undefined }; + delete response.id; + return response; + }); } private async applyBlockResponse( @@ -351,7 +419,7 @@ export class ChainhookPgStore extends BasePgStoreModule { receivedAt: number, signerPubkey: string, messageData: BlockResponseData - ) { + ): Promise<{ applied: false } | { applied: true; blockHash: string; signerKey: string }> { if (messageData.type !== 'Accepted' && messageData.type !== 'Rejected') { this.logger.error(messageData, `Unexpected BlockResponse type`); } @@ -368,12 +436,13 @@ export class ChainhookPgStore extends BasePgStoreModule { rejectCode = rejectReason[RejectReasonValidationFailed]; } } - + const blockHash = normalizeHexString(messageData.data.signer_signature_hash); + const signerKey = normalizeHexString(signerPubkey); const dbBlockResponse: DbBlockResponse = { received_at: unixTimeMillisecondsToISO(receivedAt), - signer_key: normalizeHexString(signerPubkey), + signer_key: signerKey, accepted: accepted, - signer_sighash: normalizeHexString(messageData.data.signer_signature_hash), + signer_sighash: blockHash, metadata_server_version: messageData.data.metadata.server_version, signature: messageData.data.signature, reason_string: accepted ? null : messageData.data.reason, @@ -390,11 +459,12 @@ export class ChainhookPgStore extends BasePgStoreModule { this.logger.info( `Skipped inserting duplicate block response signer=${dbBlockResponse.signer_key}, hash=${dbBlockResponse.signer_sighash}` ); - } else { - this.logger.info( - `ChainhookPgStore apply block_response signer=${dbBlockResponse.signer_key}, hash=${dbBlockResponse.signer_sighash}` - ); + return { applied: false }; } + this.logger.info( + `ChainhookPgStore apply block_response signer=${dbBlockResponse.signer_key}, hash=${dbBlockResponse.signer_sighash}` + ); + return { applied: true, blockHash, signerKey }; } private async updateStacksBlock( @@ -493,8 +563,8 @@ export class ChainhookPgStore extends BasePgStoreModule { this.logger.warn( `Missing reward set signers for cycle ${cycle_number} in block ${dbBlock.block_height}` ); - // Use setImmediate to ensure we break out of the current sql transaction within the async context - setImmediate(() => this.events.emit('missingStackerSet', { cycleNumber: cycle_number })); + // Use setTimeout to ensure we break out of the current sql transaction within the async context + setTimeout(() => this.events.emit('missingStackerSet', { cycleNumber: cycle_number })); } this.logger.info( `ChainhookPgStore apply block ${dbBlock.block_height} ${dbBlock.block_hash}` diff --git a/src/pg/notifications/pg-notifications.ts b/src/pg/notifications/pg-notifications.ts new file mode 100644 index 0000000..c746107 --- /dev/null +++ b/src/pg/notifications/pg-notifications.ts @@ -0,0 +1,66 @@ +import { + BasePgStore, + BasePgStoreModule, + batchIterate, + logger as defaultLogger, + PgSqlClient, +} from '@hirosystems/api-toolkit'; +import { EventEmitter } from 'node:events'; +import { DbWriteEvents } from '../chainhook/chainhook-pg-store'; +import { SignerMessagesEventPayload } from '../types'; + +export type DbListenEvents = EventEmitter<{ + signerMessages: [SignerMessagesEventPayload]; +}>; + +const SQL_NOTIFIY_BLOCK_PROPOSAL_CHANNEL = 'block_proposal'; + +export class NotificationPgStore extends BasePgStoreModule { + readonly events: DbListenEvents = new EventEmitter(); + readonly logger = defaultLogger.child({ module: 'NotificationPgStore' }); + readonly dbWriteEvents: DbWriteEvents; + readonly rawSqlClient: PgSqlClient; + + _sqlNotifyDisabled = false; + + constructor(db: BasePgStore, rawSqlClient: PgSqlClient, dbWriteEvents: DbWriteEvents) { + super(db); + this.rawSqlClient = rawSqlClient; + this.dbWriteEvents = dbWriteEvents; + this.subscribeToDbWriteEvents(); + this.subscribeToDbListenEvents(); + } + + private subscribeToDbWriteEvents() { + this.dbWriteEvents.on('signerMessages', msg => { + if (this._sqlNotifyDisabled) { + return; + } + // Split the messages into batches to avoid exceeding the maximum pg notify payload size + for (const batch of batchIterate(msg, 25, false)) { + this.rawSqlClient + .notify(SQL_NOTIFIY_BLOCK_PROPOSAL_CHANNEL, JSON.stringify(batch)) + .catch((error: unknown) => { + this.logger.error(error, 'Failed to sql.notify signerMessages'); + }); + } + }); + } + + private subscribeToDbListenEvents() { + this.rawSqlClient + .listen( + SQL_NOTIFIY_BLOCK_PROPOSAL_CHANNEL, + payload => { + const signerMessages = JSON.parse(payload) as SignerMessagesEventPayload; + setTimeout(() => this.events.emit('signerMessages', signerMessages)); + }, + () => { + this.logger.info('Subscribed to sql.listen block proposal notifications'); + } + ) + .catch((error: unknown) => { + this.logger.error(error, 'Failed to sql.listen block proposal notifications'); + }); + } +} diff --git a/src/pg/pg-store.ts b/src/pg/pg-store.ts index 212f133..fc2d954 100644 --- a/src/pg/pg-store.ts +++ b/src/pg/pg-store.ts @@ -12,6 +12,7 @@ import { ChainhookPgStore } from './chainhook/chainhook-pg-store'; import { BlockIdParam, normalizeHexString, sleep } from '../helpers'; import { Fragment } from 'postgres'; import { DbBlockProposalQueryResponse } from './types'; +import { NotificationPgStore } from './notifications/pg-notifications'; export const MIGRATIONS_DIR = path.join(__dirname, '../../migrations'); @@ -20,6 +21,7 @@ export const MIGRATIONS_DIR = path.join(__dirname, '../../migrations'); */ export class PgStore extends BasePgStore { readonly chainhook: ChainhookPgStore; + readonly notifications: NotificationPgStore; static async connect(opts?: { skipMigrations?: boolean; @@ -43,6 +45,7 @@ export class PgStore extends BasePgStore { maxLifetime: ENV.PG_MAX_LIFETIME, }, }); + if (pgConfig.schema && opts?.createSchema !== false) { await sql`CREATE SCHEMA IF NOT EXISTS ${sql(pgConfig.schema)}`; } @@ -67,6 +70,7 @@ export class PgStore extends BasePgStore { constructor(sql: PgSqlClient) { super(sql); this.chainhook = new ChainhookPgStore(this); + this.notifications = new NotificationPgStore(this, sql, this.chainhook.events); } async getChainTipBlockHeight(): Promise { diff --git a/src/pg/types.ts b/src/pg/types.ts index 762fe44..af7cf2e 100644 --- a/src/pg/types.ts +++ b/src/pg/types.ts @@ -26,7 +26,7 @@ export type DbRewardSetSigner = { }; export type DbBlockResponse = { - received_at: string; + received_at: string | Date; signer_key: PgBytea; accepted: boolean; signer_sighash: PgBytea; @@ -39,10 +39,10 @@ export type DbBlockResponse = { }; export type DbBlockProposal = { - received_at: string; + received_at: string | Date; miner_key: PgBytea; block_height: number; - block_time: string; + block_time: string | Date; block_hash: PgBytea; index_block_hash: PgBytea; burn_block_height: number; @@ -148,3 +148,22 @@ export type DbBlockProposalQueryResponse = { reject_code: string | null; }[]; }; + +export type BlockProposalEventArgs = { + receiptTimestamp: number; + blockHash: string; +}; +export type BlockResponseEventArgs = { + receiptTimestamp: number; + blockHash: string; + signerKey: string; +}; + +export type SignerMessagesEventPayload = ( + | { + proposal: BlockProposalEventArgs; + } + | { + response: BlockResponseEventArgs; + } +)[]; diff --git a/src/stacks-core-rpc/stacker-set-updater.ts b/src/stacks-core-rpc/stacker-set-updater.ts index 534fb34..3a9d6d2 100644 --- a/src/stacks-core-rpc/stacker-set-updater.ts +++ b/src/stacks-core-rpc/stacker-set-updater.ts @@ -81,7 +81,7 @@ export class StackerSetUpdator { `Failed to fetch stacker set for cycle ${cycleNumber}, retrying in ${FETCH_STACKER_SET_RETRY_INTERVAL_MS}ms ...` ); await sleep(FETCH_STACKER_SET_RETRY_INTERVAL_MS, this.abortController.signal); - setImmediate(() => { + setTimeout(() => { this.queuedCycleNumbers.delete(cycleNumber); this.add({ cycleNumber }); }); diff --git a/tests/db/db-notifications.test.ts b/tests/db/db-notifications.test.ts new file mode 100644 index 0000000..f6c66ef --- /dev/null +++ b/tests/db/db-notifications.test.ts @@ -0,0 +1,124 @@ +import * as fs from 'node:fs'; +import * as readline from 'node:readline/promises'; +import * as zlib from 'node:zlib'; +import { once } from 'node:events'; +import { FastifyInstance } from 'fastify'; +import { StacksPayload } from '@hirosystems/chainhook-client'; +import { buildApiServer } from '../../src/api/init'; +import { PgStore } from '../../src/pg/pg-store'; +import { BlockProposalsEntry } from '../../src/api/schemas'; +import { PoxInfo, RpcStackerSetResponse } from '../../src/stacks-core-rpc/stacks-core-rpc-client'; +import { rpcStackerSetToDbRewardSetSigners } from '../../src/stacks-core-rpc/stacker-set-updater'; +import { SignerMessagesEventPayload } from '../../src/pg/types'; +import { sleep } from '../../src/helpers'; +import { io, Socket } from 'socket.io-client'; +import { ClientToServerEvents, ServerToClientEvents } from '../../src/api/routes/socket-io'; + +describe('Db notifications tests', () => { + let db: PgStore; + let apiServer: FastifyInstance; + + let socketClient: Socket; + + const testingBlockHash = '0x2f1c4e83fda403682b1ab5dd41383e47d2cb3dfec0fd26f0886883462d7802fb'; + let proposalTestPayload: StacksPayload; + + beforeAll(async () => { + db = await PgStore.connect(); + db.notifications._sqlNotifyDisabled = true; + apiServer = await buildApiServer({ db }); + await apiServer.listen({ port: 0, host: '127.0.0.1' }); + + // insert pox-info dump + const poxInfoDump = JSON.parse( + fs.readFileSync('./tests/dumps/dump-pox-info-2024-11-02.json', 'utf8') + ) as PoxInfo; + await db.updatePoxInfo(poxInfoDump); + + // insert stacker-set dump + const stackerSetDump = JSON.parse( + fs.readFileSync('./tests/dumps/dump-stacker-set-cycle-72-2024-11-02.json', 'utf8') + ) as RpcStackerSetResponse; + await db.chainhook.insertRewardSetSigners( + db.sql, + rpcStackerSetToDbRewardSetSigners(stackerSetDump, 72) + ); + + // insert chainhook-payloads dump + const spyInfoLog = jest.spyOn(db.chainhook.logger, 'info').mockImplementation(() => {}); // Surpress noisy logs during bulk insertion test + const payloadDumpFile = './tests/dumps/dump-chainhook-payloads-2024-11-02.ndjson.gz'; + const rl = readline.createInterface({ + input: fs.createReadStream(payloadDumpFile).pipe(zlib.createGunzip()), + crlfDelay: Infinity, + }); + for await (const line of rl) { + const payload = JSON.parse(line) as StacksPayload; + // find and store the test block proposal payload for later testing + if ( + !proposalTestPayload && + payload.events.find( + event => + event.payload.data.message.type === 'BlockProposal' && + event.payload.data.message.data.block.block_hash === testingBlockHash + ) + ) { + proposalTestPayload = payload; + } + await db.chainhook.processPayload(payload); + } + rl.close(); + spyInfoLog.mockRestore(); + + socketClient = io(`ws://127.0.0.1:${apiServer.addresses()[0].port}/block-proposals`, { + path: '/signer-metrics/socket.io/', + }); + await new Promise((resolve, reject) => { + socketClient.on('connect', resolve); + socketClient.io.on('error', reject); + }); + db.notifications._sqlNotifyDisabled = false; + }); + + afterAll(async () => { + socketClient.disconnect(); + await apiServer.close(); + await db.close(); + }); + + test('test block proposal write events', async () => { + const pgNotifyEvent: Promise = once( + db.notifications.events, + 'signerMessages' + ); + + const initialWriteEvent: Promise = once( + db.chainhook.events, + 'signerMessages' + ); + + const clientSocketEvent = new Promise(resolve => { + socketClient.on('blockProposal', data => { + resolve(data); + }); + }); + + // delete block proposal from db, returning the data so we can re-write it + const blockProposal = await db.chainhook.deleteBlockProposal(db.sql, testingBlockHash); + expect(blockProposal.block_hash).toBe(testingBlockHash); + + const blockResponses = await db.chainhook.deleteBlockResponses(db.sql, testingBlockHash); + expect(blockResponses.length).toBeGreaterThan(0); + expect(blockResponses[0].signer_sighash).toBe(testingBlockHash); + + await db.chainhook.processPayload(proposalTestPayload); + + const promiseResults = await Promise.all([pgNotifyEvent, initialWriteEvent, clientSocketEvent]); + expect( + promiseResults[0][0].find(r => 'proposal' in r && r.proposal.blockHash === testingBlockHash) + ).toBeTruthy(); + expect( + promiseResults[1][0].find(r => 'proposal' in r && r.proposal.blockHash === testingBlockHash) + ).toBeTruthy(); + expect(promiseResults[2].block_hash).toBe(testingBlockHash); + }); +}); diff --git a/tests/db/duplicate-signer-set-insert.test.ts b/tests/db/duplicate-signer-set-insert.test.ts index 3e2a931..71aaf55 100644 --- a/tests/db/duplicate-signer-set-insert.test.ts +++ b/tests/db/duplicate-signer-set-insert.test.ts @@ -8,6 +8,7 @@ describe('Duplicate signer set insert', () => { beforeAll(async () => { db = await PgStore.connect(); + db.notifications._sqlNotifyDisabled = true; }); afterAll(async () => { diff --git a/tests/db/endpoints.test.ts b/tests/db/endpoints.test.ts index be28b68..fb67d78 100644 --- a/tests/db/endpoints.test.ts +++ b/tests/db/endpoints.test.ts @@ -28,6 +28,7 @@ describe('Endpoint tests', () => { beforeAll(async () => { db = await PgStore.connect(); + db.notifications._sqlNotifyDisabled = true; apiServer = await buildApiServer({ db }); await apiServer.listen({ port: 0, host: '127.0.0.1' }); diff --git a/tests/db/ingestion.test.ts b/tests/db/ingestion.test.ts index 6a505af..c373b24 100644 --- a/tests/db/ingestion.test.ts +++ b/tests/db/ingestion.test.ts @@ -8,6 +8,7 @@ describe('Postgres ingestion tests', () => { let db: PgStore; beforeAll(async () => { db = await PgStore.connect(); + db.notifications._sqlNotifyDisabled = true; }); afterAll(async () => { diff --git a/tests/db/jest-global-teardown.ts b/tests/db/jest-global-teardown.ts index fc3daba..04d4d58 100644 --- a/tests/db/jest-global-teardown.ts +++ b/tests/db/jest-global-teardown.ts @@ -9,7 +9,7 @@ export default async function teardown(): Promise { const docker = new Docker(); const container = docker.getContainer(containerId); await container.stop(); - await container.remove(); + await container.remove({ v: true }); console.log(`PostgreSQL container stopped and removed`); } }