diff --git a/src/helpers/constants.ts b/src/helpers/constants.ts index 79f2ca8..f489605 100644 --- a/src/helpers/constants.ts +++ b/src/helpers/constants.ts @@ -1,13 +1,25 @@ +const enum PubSubRetryCodes { + CANCELLED = 1, + UNKNOWN = 2, + DEADLINE_EXCEEDED = 4, + NOT_FOUND = 5, + PERMISSION_DENIED = 7, + RESOURCE_EXHAUSTED = 8, + ABORTED = 10, + INTERNAL = 13, + UNAVALIBLE = 14, +} + export const PUB_SUB_DEFAULT_RETRY_CODES = [ - 10, // 'ABORTED' - 1, // 'CANCELLED', - 4, // 'DEADLINE_EXCEEDED' - 13, // 'INTERNAL' - 8, // 'RESOURCE_EXHAUSTED' - 14, // 'UNAVAILABLE' - 2, // 'UNKNOWN' - 5, // NOT_FOUND' - 7, // PERMISSION_DENIED + PubSubRetryCodes.CANCELLED, + PubSubRetryCodes.UNKNOWN, + PubSubRetryCodes.DEADLINE_EXCEEDED, + PubSubRetryCodes.NOT_FOUND, + PubSubRetryCodes.PERMISSION_DENIED, + PubSubRetryCodes.RESOURCE_EXHAUSTED, + PubSubRetryCodes.ABORTED, + PubSubRetryCodes.INTERNAL, + PubSubRetryCodes.UNAVALIBLE, ] export const PUB_SUB_DEFAULT_BACKOFF_SETTINGS = { @@ -20,6 +32,8 @@ export const PUB_SUB_DEFAULT_BACKOFF_SETTINGS = { totalTimeoutMillis: 600000, } -export const MESSAGE = 'message' -export const ERROR = 'error' -export const CLOSE = 'close' +export const enum EVENT { + MESSAGE = 'message', + ERROR = 'error', + CLOSE = 'close', +} diff --git a/src/helpers/testHelpers.ts b/src/helpers/testHelpers.ts index 17deeea..97dd308 100644 --- a/src/helpers/testHelpers.ts +++ b/src/helpers/testHelpers.ts @@ -1,10 +1,11 @@ -import { PublishOptions } from '@google-cloud/pubsub/build/src/topic' -import { GoogleAuthOptions } from '../interfaces/gcloud-pub-sub.interface' +import type { SubscriberOptions } from '@google-cloud/pubsub/build/src/subscriber' +import type { PublishOptions } from '@google-cloud/pubsub/build/src/topic' + +import type { GoogleAuthOptions } from '../interfaces/gcloud-pub-sub.interface' import { PUB_SUB_DEFAULT_RETRY_CODES, PUB_SUB_DEFAULT_BACKOFF_SETTINGS } from './constants' -import { SubscriberOptions } from '@google-cloud/pubsub/build/src/subscriber' export const mockGoogleAuthOptions: GoogleAuthOptions = { - projectId: 'entitlement', + projectId: 'projectId', } export const mockPublishOptions: PublishOptions = { diff --git a/src/interfaces/gcloud-pub-sub.interface.ts b/src/interfaces/gcloud-pub-sub.interface.ts index 3352694..d543d85 100644 --- a/src/interfaces/gcloud-pub-sub.interface.ts +++ b/src/interfaces/gcloud-pub-sub.interface.ts @@ -1,7 +1,7 @@ -import { Type } from '@nestjs/common' -import { ModuleMetadata } from '@nestjs/common/interfaces' -import { PublishOptions } from '@google-cloud/pubsub/build/src/topic' -import { SubscriberOptions } from '@google-cloud/pubsub/build/src/subscriber' +import type { Type } from '@nestjs/common' +import type { ModuleMetadata } from '@nestjs/common/interfaces' +import type { PublishOptions } from '@google-cloud/pubsub/build/src/topic' +import type { SubscriberOptions } from '@google-cloud/pubsub/build/src/subscriber' export interface GCloudPubSubServerOptions { authOptions: GoogleAuthOptions diff --git a/src/microservice/gcloud-pub-sub.server.spec.ts b/src/microservice/gcloud-pub-sub.server.spec.ts index deac421..342d350 100644 --- a/src/microservice/gcloud-pub-sub.server.spec.ts +++ b/src/microservice/gcloud-pub-sub.server.spec.ts @@ -46,24 +46,18 @@ describe('GCloudPubSubServer', () => { it('Instantiates', () => { expect(server.client).toBe(null) expect(server.subscriptions.length).toBe(0) - expect(server.options).toMatchInlineSnapshot(` - { - "authOptions": { - "projectId": "entitlement", - }, - "subscriberOptions": { - "flowControl": { - "allowExcessMessages": false, - "maxMessages": 5, - }, - }, - "subscriptionIds": [ - "create", - "update", - "delete", - ], - } - `) + expect(server.options).toEqual({ + authOptions: { + projectId: 'projectId', + }, + subscriberOptions: { + flowControl: { + allowExcessMessages: false, + maxMessages: 5, + }, + }, + subscriptionIds: ['create', 'update', 'delete'], + }) }) it('Instantiates without subscriberOptions', () => { @@ -73,18 +67,12 @@ describe('GCloudPubSubServer', () => { }) expect(server2.client).toBe(null) expect(server2.subscriptions.length).toBe(0) - expect(server2.options).toMatchInlineSnapshot(` - { - "authOptions": { - "projectId": "entitlement", - }, - "subscriptionIds": [ - "create", - "update", - "delete", - ], - } - `) + expect(server2.options).toEqual({ + authOptions: { + projectId: 'projectId', + }, + subscriptionIds: ['create', 'update', 'delete'], + }) }) describe('listen', () => { @@ -210,6 +198,7 @@ describe('GCloudPubSubServer', () => { // @ts-ignore const handleErrorFunction = server.handleErrorFactory(subscription, subscriptionName) + // @ts-ignore handleErrorFunction(error) jest.advanceTimersByTime(5000) @@ -235,6 +224,7 @@ describe('GCloudPubSubServer', () => { // @ts-ignore const handleErrorFunction = server.handleErrorFactory(subscription, subscriptionName) + // @ts-ignore handleErrorFunction(error) jest.advanceTimersByTime(5000) diff --git a/src/microservice/gcloud-pub-sub.server.ts b/src/microservice/gcloud-pub-sub.server.ts index a0cc265..fc3831c 100644 --- a/src/microservice/gcloud-pub-sub.server.ts +++ b/src/microservice/gcloud-pub-sub.server.ts @@ -1,15 +1,12 @@ import { PubSub, Subscription, Message } from '@google-cloud/pubsub' import { Server, CustomTransportStrategy } from '@nestjs/microservices' -import { MESSAGE, ERROR, PUB_SUB_DEFAULT_RETRY_CODES } from '../helpers/constants' +import { EVENT, PUB_SUB_DEFAULT_RETRY_CODES } from '../helpers/constants' import { GCloudPubSubServerOptions } from '../interfaces/gcloud-pub-sub.interface' const RETRY_INTERVAL = 5000 -export class GCloudPubSubServer - extends Server - implements CustomTransportStrategy -{ +export class GCloudPubSubServer extends Server implements CustomTransportStrategy { public client: PubSub = null public subscriptions: Subscription[] = [] public isShuttingDown: boolean = false @@ -28,15 +25,15 @@ export class GCloudPubSubServer ) const handleMessage = this.handleMessageFactory(subcriptionName) const handleError = this.handleErrorFactory(subscription, subcriptionName) - subscription.on(MESSAGE, handleMessage.bind(this)) - subscription.on(ERROR, handleError) + subscription.on(EVENT.MESSAGE, handleMessage.bind(this)) + subscription.on(EVENT.ERROR, handleError.bind(this)) this.subscriptions.push(subscription) }) callback() } public handleErrorFactory(subscription: Subscription, subcriptionName: string) { - return (error): void => { + const handleError = (error: { code: number } & string): void => { this.handleError(error) if (!this.isShuttingDown && PUB_SUB_DEFAULT_RETRY_CODES.includes(error.code)) { this.logger.warn(`Closing subscription: ${subcriptionName}`) @@ -47,18 +44,17 @@ export class GCloudPubSubServer }, RETRY_INTERVAL) } } + return handleError } public close() { this.isShuttingDown = true - this.subscriptions.forEach((subscription) => { - subscription.close() - }) + this.subscriptions.forEach((subscription) => subscription.close()) } public handleMessageFactory(subscriptionName: string) { - return async (message: Message) => { - const handler = this.getHandlerByPattern(subscriptionName) + const handler = this.getHandlerByPattern(subscriptionName) + const handleMessage = async (message: Message) => { if (!handler) { this.logger.warn(`ack message with no active handler: ${message.id}`) message.ack() @@ -66,5 +62,6 @@ export class GCloudPubSubServer } await handler(message) } + return handleMessage } } diff --git a/src/module/gcloud-pub-sub.module.ts b/src/module/gcloud-pub-sub.module.ts index 4fea8e3..4423c2e 100644 --- a/src/module/gcloud-pub-sub.module.ts +++ b/src/module/gcloud-pub-sub.module.ts @@ -1,5 +1,6 @@ -import { DynamicModule, Module, Provider } from '@nestjs/common' -import { PublishOptions } from '@google-cloud/pubsub/build/src/topic' +import { Module } from '@nestjs/common' +import type { DynamicModule, Provider } from '@nestjs/common' +import type { PublishOptions } from '@google-cloud/pubsub/build/src/topic' import { GcloudPubSubService } from './gcloud-pub-sub.service' import { PUB_SUB_DEFAULT_BACKOFF_SETTINGS, PUB_SUB_DEFAULT_RETRY_CODES } from '../helpers/constants' import { diff --git a/src/module/gcloud-pub-sub.service.spec.ts b/src/module/gcloud-pub-sub.service.spec.ts index 54c58f1..b9cb7be 100644 --- a/src/module/gcloud-pub-sub.service.spec.ts +++ b/src/module/gcloud-pub-sub.service.spec.ts @@ -1,4 +1,6 @@ -import { Test, TestingModule } from '@nestjs/testing' +import { Test } from '@nestjs/testing' +import type { TestingModule } from '@nestjs/testing' + import { GcloudPubSubService } from './gcloud-pub-sub.service' import { mockGoogleAuthOptions, mockPublishOptions } from '../helpers/testHelpers' @@ -38,84 +40,84 @@ describe('GcloudPubSubService', () => { const data = 'You Tried Your Best and You Failed Miserably. The Lesson Is Never Try' const gcloudPubSubLibMock = { topic: jest.fn().mockReturnThis(), - publish: jest.fn((buffer) => { - expect(buffer).toMatchSnapshot() + publishMessage: jest.fn((buffer) => { + expect(buffer.data).toMatchSnapshot() }), } service.gcloudPubSubLib = gcloudPubSubLibMock as any service.publishMessage(topic, data) - expect(gcloudPubSubLibMock.publish).toHaveBeenCalled() + expect(gcloudPubSubLibMock.publishMessage).toHaveBeenCalled() }) it('handles Buffer as data', () => { const topic = 'Homer' const data = 'You Tried Your Best and You Failed Miserably. The Lesson Is Never Try' const gcloudPubSubLibMock = { topic: jest.fn().mockReturnThis(), - publish: jest.fn((buffer) => { - expect(buffer).toMatchSnapshot() + publishMessage: jest.fn((buffer) => { + expect(buffer.data).toMatchSnapshot() }), } service.gcloudPubSubLib = gcloudPubSubLibMock as any service.publishMessage(topic, Buffer.from(data)) - expect(gcloudPubSubLibMock.publish).toHaveBeenCalled() + expect(gcloudPubSubLibMock.publishMessage).toHaveBeenCalled() }) it('handles an array of numbers as data', () => { const topic = 'Homer' const data = [10, 20, 30, 40, 50] const gcloudPubSubLibMock = { topic: jest.fn().mockReturnThis(), - publish: jest.fn((buffer) => { - expect(buffer).toMatchSnapshot() + publishMessage: jest.fn((buffer) => { + expect(buffer.data).toMatchSnapshot() }), } service.gcloudPubSubLib = gcloudPubSubLibMock as any service.publishMessage(topic, data) - expect(gcloudPubSubLibMock.publish).toHaveBeenCalled() + expect(gcloudPubSubLibMock.publishMessage).toHaveBeenCalled() }) it('handles an ArrayBuffer as data', () => { const topic = 'Homer' const data = new ArrayBuffer(1) const gcloudPubSubLibMock = { topic: jest.fn().mockReturnThis(), - publish: jest.fn((buffer) => { - expect(buffer).toMatchSnapshot() + publishMessage: jest.fn((buffer) => { + expect(buffer.data).toMatchSnapshot() }), } service.gcloudPubSubLib = gcloudPubSubLibMock as any service.publishMessage(topic, data) - expect(gcloudPubSubLibMock.publish).toHaveBeenCalled() + expect(gcloudPubSubLibMock.publishMessage).toHaveBeenCalled() }) it('handles a SharedArrayBuffer as data', () => { const topic = 'Homer' const data = new SharedArrayBuffer(1) const gcloudPubSubLibMock = { topic: jest.fn().mockReturnThis(), - publish: jest.fn((buffer) => { - expect(buffer).toMatchSnapshot() + publishMessage: jest.fn((buffer) => { + expect(buffer.data).toMatchSnapshot() }), } service.gcloudPubSubLib = gcloudPubSubLibMock as any service.publishMessage(topic, data) - expect(gcloudPubSubLibMock.publish).toHaveBeenCalled() + expect(gcloudPubSubLibMock.publishMessage).toHaveBeenCalled() }) it('handles a Uint8Array as data', () => { const topic = 'Homer' const data = new Uint8Array([1, 2, 3]) const gcloudPubSubLibMock = { topic: jest.fn().mockReturnThis(), - publish: jest.fn((buffer) => { - expect(buffer).toMatchSnapshot() + publishMessage: jest.fn((buffer) => { + expect(buffer.data).toMatchSnapshot() }), } service.gcloudPubSubLib = gcloudPubSubLibMock as any service.publishMessage(topic, data) - expect(gcloudPubSubLibMock.publish).toHaveBeenCalled() + expect(gcloudPubSubLibMock.publishMessage).toHaveBeenCalled() }) it('handles a string and binary encoding', () => { const topic = 'Homer' @@ -123,14 +125,14 @@ describe('GcloudPubSubService', () => { const encoding = 'binary' const gcloudPubSubLibMock = { topic: jest.fn().mockReturnThis(), - publish: jest.fn((buffer) => { - expect(buffer).toMatchSnapshot() + publishMessage: jest.fn((buffer) => { + expect(buffer.data).toMatchSnapshot() }), } service.gcloudPubSubLib = gcloudPubSubLibMock as any service.publishMessage(topic, data, {}, encoding) - expect(gcloudPubSubLibMock.publish).toHaveBeenCalled() + expect(gcloudPubSubLibMock.publishMessage).toHaveBeenCalled() }) }) }) diff --git a/src/module/gcloud-pub-sub.service.ts b/src/module/gcloud-pub-sub.service.ts index 8a8f617..f65f718 100644 --- a/src/module/gcloud-pub-sub.service.ts +++ b/src/module/gcloud-pub-sub.service.ts @@ -1,7 +1,8 @@ import { Injectable } from '@nestjs/common' import { PubSub } from '@google-cloud/pubsub' -import { GoogleAuthOptions } from '../interfaces/gcloud-pub-sub.interface' -import { PublishOptions } from '@google-cloud/pubsub/build/src/topic' +import type { PublishOptions } from '@google-cloud/pubsub/build/src/topic' + +import type { GoogleAuthOptions } from '../interfaces/gcloud-pub-sub.interface' @Injectable() export class GcloudPubSubService { @@ -15,7 +16,7 @@ export class GcloudPubSubService { } public publishMessage( - topic: string, + topicName: string, data: string | Uint8Array | number[] | ArrayBuffer | SharedArrayBuffer, attributes: { [key: string]: string } = {}, encoding?: BufferEncoding @@ -34,6 +35,14 @@ export class GcloudPubSubService { } else { dataBuffer = Buffer.from(data as string) } - return this.gcloudPubSubLib.topic(topic, this.publishOptions).publish(dataBuffer, attributes) + + const messageOptions = { + data: dataBuffer, + attributes: attributes, + } + + const topic = this.gcloudPubSubLib.topic(topicName, this.publishOptions) + + return topic.publishMessage(messageOptions) } }