Skip to content

Commit

Permalink
feat: ✨ API: Add graceful shutdown/reload code
Browse files Browse the repository at this point in the history
  • Loading branch information
JesseTheRobot committed Dec 18, 2023
1 parent b8eaebc commit 8dd3ae5
Show file tree
Hide file tree
Showing 5 changed files with 319 additions and 4 deletions.
26 changes: 26 additions & 0 deletions ecosystem.config.cjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
const { config } = require('dotenv');
config();

module.exports =
// ecosystem.js
{
apps: [
{
name: 'Indexer',
script: 'dist/app.js', // name of the startup file
exec_mode: 'fork', // to turn on cluster mode; defaults to 'fork' mode
env: {
PORT: '10001', // the port on which the app should listen
},
},
{
name: 'HttpServer',
script: 'dist/HttpServer.js',
instances: 2,
exec_mode: 'cluster',
env: {
PORT: '10000', // the port on which the app should listen
},
},
],
};
3 changes: 3 additions & 0 deletions scripts/test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import { contiguousDataCacheTmpCleanupWorker } from '../src/system';

const batch = await contiguousDataCacheTmpCleanupWorker.getBatch('data');
73 changes: 69 additions & 4 deletions src/HttpServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,36 @@ import { dataRouter } from './routes/data/index.js';
import { apolloServer } from './routes/graphql/index.js';
import { openApiRouter } from './routes/openapi.js';
import * as system from './system.js';
import { GracefulShutdownController } from './utils/gracefulShutdown.js';

const logger = log.child({ name: 'HttpServer' });
let isShuttingDown = false;
let manualShutdown = () => {};

const gracefulShutdown = async (): Promise<void> => {
logger.warn('Shutting down...');
Promise.all([manualShutdown()])
.then((_) => {
process.exit(0);
})
.catch((_) => process.exit(1));
};

process.on('message', async (message) => {
if (message === 'shutdown') {
logger.verbose('received shutdown message');
if (isShuttingDown) return;
isShuttingDown = true;
gracefulShutdown();
}
});

process.on('SIGINT', () => {
logger.verbose('received signal sigint');
if (isShuttingDown) return;
isShuttingDown = true;
gracefulShutdown();
});

const app = express();

Expand All @@ -29,12 +59,47 @@ const apolloServerInstanceGql = apolloServer(system.db, {
introspection: true,
persistedQueries: false,
});
apolloServerInstanceGql.start().then(() => {
await apolloServerInstanceGql.start().then(() => {
apolloServerInstanceGql.applyMiddleware({
app,
path: '/graphql',
});
app.listen(config.PORT, () => {
log.info(`Listening on port ${config.PORT}`);
});
});

const server = app.listen(config.PORT);
logger.info(`Listening on port ${config.PORT}`);

server.keepAliveTimeout = 61 * 1000;
server.headersTimeout = 62 * 1000;

server.on('close', () => {
logger.debug(`closing...`);
});

// // eslint-disable-next-line @typescript-eslint/naming-convention
// async function onHTTPShutdown(): Promise<void> {
// // insert cleanup operation(s) here
// const cleanup = async (): Promise<void> => {
// await new Promise((r) => wss.close(r));
// };
// // await Promise.allSettled(wss.clients)
// await Promise.race([sleep(5_000), cleanup]);
// }

const controller = new GracefulShutdownController({
server,
preShutdown: async (): Promise<void> => {
server.closeIdleConnections(); // TODO: test me!!
},
});
// overwrite manual shutdown noop method
manualShutdown = (): Promise<void> => controller.shutdown();

if (process.send) {
process?.send?.('ready', undefined, undefined, (e) =>
e ? logger.error(`Error sending ready message: ${e}`) : undefined,
); // send ready pm2 message
logger.info(`Ready message sent`);
} else {
logger.warn(`Ready message NOT sent`);
}
219 changes: 219 additions & 0 deletions src/utils/gracefulShutdown.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
import type { ServerResponse } from 'http';
import { type Server } from 'http';
import { Logger } from 'winston';

import logger from '../log.js';
import { sleep } from './utils.js';

export class GracefulShutdownController {
private timeout = 30_000;
private connections = new Map();
private secureConnections = new Map();
private server: Server;
private connectionCounter = 0;
private secureConnectionCounter = 0;
private shutdownPromise?: void | PromiseLike<void>;
private preShutdown?: () => Promise<void>;
protected log: Logger;

constructor(opts: { server: Server; preShutdown?: () => Promise<void> }) {
this.server = opts.server;
this.server.on('connection', this.connectionEventHandler.bind(this));
this.server.on('request', this.requestEventHandler.bind(this));
this.server.on('secureConnection', this.secureConnectionHandler.bind(this));
this.log = logger.child({ class: this.constructor.name });
}

get isShuttingDown(): boolean {
return this.shutdownPromise !== undefined;
}

requestEventHandler(req: any, res: ServerResponse): void {
req.socket._isIdle = false;
if (this.isShuttingDown) {
// this.lastConnection = performance.now();
this.log.warn('Received request while shutting down');
}

if (this.isShuttingDown && !res.headersSent) {
res.setHeader('connection', 'close');
}

res.on('finish', function (this: ServerResponse) {
req.socket._isIdle = true;
this.destroy(req.socket);
});
}

connectionEventHandler(socket: any): void {
if (this.isShuttingDown) {
// this.log.warn(`REJECTED INCOMING CONNECTION`);
// socket.destroy();
this.log.warn('Received request while shutting down');
// this.lastConnection = performance.now();
}
const id = this.connectionCounter++;
socket._isIdle = true;
socket._connectionId = id;
this.connections.set(id, socket);

socket.once('close', () => {
this.connections.delete(socket._connectionId);
});
}

// destroy(socket: Socket & { _connectionId: string; _isIdle: boolean; server: any }, force = false): void {
// if ((socket._isIdle && this.isShuttingDown) || force) {
// this.log.info("DESTROY");
// socket.destroy();
// if (socket.server instanceof Server) {
// this.connections.delete(socket._connectionId);
// } else {
// this.secureConnections.delete(socket._connectionId);
// }
// }
// }

secureConnectionHandler(socket: any): void {
// if (this.isShuttingDown) {
// this.log.warn(`REJECTED INCOMING CONNECTION`);
// socket.destroy();
// } else {
const id = this.secureConnectionCounter++;
socket._isIdle = true;
socket._connectionId = id;
this.secureConnections.set(id, socket);

socket.once('close', () => {
this.secureConnections.delete(socket._connectionId);
});
// }
}

// returns true if should force shut down. returns false for shut down without force
async waitForReadyToShutDown(totalNumInterval: number): Promise<void> {
while (totalNumInterval-- > 0) {
this.log.debug(`waitForReadyToShutDown... ${totalNumInterval}`);

if (totalNumInterval === 0) {
// timeout reached
this.log.warn(
`Could not close connections in time (${this.timeout}ms), will forcefully shut down`,
);
return;
}

// const symb = Object.getOwnPropertySymbols(this.server).find((v) => v.toString() === "Symbol(http.server.connections)");
// const connectionsList = this.server[symb];

// const activeConnections = connectionsList.active();

// test all connections closed already?
const allConnectionsClosed =
this.connections.size === 0 && this.secureConnections.size === 0;

if (allConnectionsClosed) {
this.log.debug('All connections closed. Continue to shutting down');
// use this if issues persist.

// if (cluster.isWorker) {
// const worker = cluster.worker;
// // console.log(worker);
// console.log("DISCONNECT");
// worker.disconnect();
// }

// const timeSinceLastConn = performance.now() - this.lastConnection;
// // console.log("timeSinceLastConn", timeSinceLastConn);

// while (performance.now() - this.lastConnection < 250) {
// await sleep(50);
// this.log.info(`BusyWait for no connections...`);
// }
// this.log.info("busywait done");

return;
}

this.log.debug('Schedule the next waitForReadyToShutdown');
await sleep(250);
}
}

async destroyAllConnections(force = false): Promise<void> {
// destroy empty and idle connections / all connections (if force = true)
this.log.debug(
'Destroy Connections : ' + (force ? 'forced close' : 'close'),
);

const httpServerConnections = await new Promise((res, rej) =>
this.server.getConnections((e, c) => {
if (e != undefined) rej(e);
if (c != undefined) res(c);
}),
);

this.log.debug(
`server has ${this.server.connections} (${httpServerConnections}) connections`,
);

for (const socket of this.connections.values()) {
const serverResponse = socket._httpMessage;
// send connection close header to open connections
if (serverResponse && !force) {
if (!serverResponse.headersSent) {
serverResponse.setHeader('connection', 'close');
}
}
}

this.log.debug('Connection Counter : ' + this.connectionCounter);

for (const socket of this.secureConnections.values()) {
const serverResponse = socket._httpMessage;
// send connection close header to open connections
if (serverResponse && !force) {
if (!serverResponse.headersSent) {
serverResponse.setHeader('connection', 'close');
}
}
}

this.log.debug(
'Secure Connection Counter : ' + this.secureConnectionCounter,
);
// const symb = Object.getOwnPropertySymbols(this.server).find((v) => v.toString() === "Symbol(http.server.connections)");
// const connectionsList = this.server[symb];

// const idleConnections = connectionsList.idle();

// for (const connection of idleConnections) {
// connection.socket.destroy();
// }
}

private async runShutdown(signal: string): Promise<void> {
this.log.info(`shutting down with signal ${signal}`);
if (this.preShutdown instanceof Function) await this.preShutdown();

// if (cluster.isWorker) {
// // this *should* cause connection distribution to fail sending reqs to this worker
// // see https://github.com/nodejs/node/blob/33710e7e7d39d19449a75911537d630349110a0c/lib/internal/cluster/child.js#L236
// this.server.maxConnections = null;
// }

await this.destroyAllConnections();
await this.waitForReadyToShutDown(Math.round(this.timeout / 250));
this.log.verbose(`Closing server`);
await new Promise<void>((res, rej) =>
this.server.close((err) => (err ? rej(err) : res())),
).catch((e) =>
this.log.error(`Error closing the server: ${e.toString()} ${e.stack}`),
);
}

public async shutdown(signal = 'manual'): Promise<void> {
if (!this.isShuttingDown) this.shutdownPromise = this.runShutdown(signal);
return this.shutdownPromise;
}
}
2 changes: 2 additions & 0 deletions src/utils/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export const sleep = (ms: number): Promise<void> =>
new Promise((resolve) => setTimeout(resolve, ms));

0 comments on commit 8dd3ae5

Please sign in to comment.