Skip to content

Commit

Permalink
change imports
Browse files Browse the repository at this point in the history
  • Loading branch information
viktree committed Apr 18, 2024
1 parent 6d6a500 commit 17c808b
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 91 deletions.
38 changes: 26 additions & 12 deletions src/helpers/constants.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,25 @@
const enum PubSubRetryCodes {
CANCELLED = 1,
UNKNOWN = 2,
DEADLINE_EXCEEDED = 4,
NOT_FOUND = 5,
PERMISSION_DENIED = 7,
RESOURCE_EXHAUSTED = 8,
ABORTED = 10,
INTERNAL = 13,
UNAVALIBLE = 14,
}

export const PUB_SUB_DEFAULT_RETRY_CODES = [
10, // 'ABORTED'
1, // 'CANCELLED',
4, // 'DEADLINE_EXCEEDED'
13, // 'INTERNAL'
8, // 'RESOURCE_EXHAUSTED'
14, // 'UNAVAILABLE'
2, // 'UNKNOWN'
5, // NOT_FOUND'
7, // PERMISSION_DENIED
PubSubRetryCodes.CANCELLED,
PubSubRetryCodes.UNKNOWN,
PubSubRetryCodes.DEADLINE_EXCEEDED,
PubSubRetryCodes.NOT_FOUND,
PubSubRetryCodes.PERMISSION_DENIED,
PubSubRetryCodes.RESOURCE_EXHAUSTED,
PubSubRetryCodes.ABORTED,
PubSubRetryCodes.INTERNAL,
PubSubRetryCodes.UNAVALIBLE,
]

export const PUB_SUB_DEFAULT_BACKOFF_SETTINGS = {
Expand All @@ -20,6 +32,8 @@ export const PUB_SUB_DEFAULT_BACKOFF_SETTINGS = {
totalTimeoutMillis: 600000,
}

export const MESSAGE = 'message'
export const ERROR = 'error'
export const CLOSE = 'close'
export const enum EVENT {
MESSAGE = 'message',
ERROR = 'error',
CLOSE = 'close',
}
9 changes: 5 additions & 4 deletions src/helpers/testHelpers.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { PublishOptions } from '@google-cloud/pubsub/build/src/topic'
import { GoogleAuthOptions } from '../interfaces/gcloud-pub-sub.interface'
import type { SubscriberOptions } from '@google-cloud/pubsub/build/src/subscriber'
import type { PublishOptions } from '@google-cloud/pubsub/build/src/topic'

import type { GoogleAuthOptions } from '../interfaces/gcloud-pub-sub.interface'
import { PUB_SUB_DEFAULT_RETRY_CODES, PUB_SUB_DEFAULT_BACKOFF_SETTINGS } from './constants'
import { SubscriberOptions } from '@google-cloud/pubsub/build/src/subscriber'

export const mockGoogleAuthOptions: GoogleAuthOptions = {
projectId: 'entitlement',
projectId: 'projectId',
}

export const mockPublishOptions: PublishOptions = {
Expand Down
8 changes: 4 additions & 4 deletions src/interfaces/gcloud-pub-sub.interface.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Type } from '@nestjs/common'
import { ModuleMetadata } from '@nestjs/common/interfaces'
import { PublishOptions } from '@google-cloud/pubsub/build/src/topic'
import { SubscriberOptions } from '@google-cloud/pubsub/build/src/subscriber'
import type { Type } from '@nestjs/common'
import type { ModuleMetadata } from '@nestjs/common/interfaces'
import type { PublishOptions } from '@google-cloud/pubsub/build/src/topic'
import type { SubscriberOptions } from '@google-cloud/pubsub/build/src/subscriber'

export interface GCloudPubSubServerOptions {
authOptions: GoogleAuthOptions
Expand Down
50 changes: 20 additions & 30 deletions src/microservice/gcloud-pub-sub.server.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,24 +46,18 @@ describe('GCloudPubSubServer', () => {
it('Instantiates', () => {
expect(server.client).toBe(null)
expect(server.subscriptions.length).toBe(0)
expect(server.options).toMatchInlineSnapshot(`
{
"authOptions": {
"projectId": "entitlement",
},
"subscriberOptions": {
"flowControl": {
"allowExcessMessages": false,
"maxMessages": 5,
},
},
"subscriptionIds": [
"create",
"update",
"delete",
],
}
`)
expect(server.options).toEqual({
authOptions: {
projectId: 'projectId',
},
subscriberOptions: {
flowControl: {
allowExcessMessages: false,
maxMessages: 5,
},
},
subscriptionIds: ['create', 'update', 'delete'],
})
})

it('Instantiates without subscriberOptions', () => {
Expand All @@ -73,18 +67,12 @@ describe('GCloudPubSubServer', () => {
})
expect(server2.client).toBe(null)
expect(server2.subscriptions.length).toBe(0)
expect(server2.options).toMatchInlineSnapshot(`
{
"authOptions": {
"projectId": "entitlement",
},
"subscriptionIds": [
"create",
"update",
"delete",
],
}
`)
expect(server2.options).toEqual({
authOptions: {
projectId: 'projectId',
},
subscriptionIds: ['create', 'update', 'delete'],
})
})

describe('listen', () => {
Expand Down Expand Up @@ -210,6 +198,7 @@ describe('GCloudPubSubServer', () => {

// @ts-ignore
const handleErrorFunction = server.handleErrorFactory(subscription, subscriptionName)
// @ts-ignore
handleErrorFunction(error)

jest.advanceTimersByTime(5000)
Expand All @@ -235,6 +224,7 @@ describe('GCloudPubSubServer', () => {

// @ts-ignore
const handleErrorFunction = server.handleErrorFactory(subscription, subscriptionName)
// @ts-ignore
handleErrorFunction(error)

jest.advanceTimersByTime(5000)
Expand Down
23 changes: 10 additions & 13 deletions src/microservice/gcloud-pub-sub.server.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,12 @@
import { PubSub, Subscription, Message } from '@google-cloud/pubsub'
import { Server, CustomTransportStrategy } from '@nestjs/microservices'

import { MESSAGE, ERROR, PUB_SUB_DEFAULT_RETRY_CODES } from '../helpers/constants'
import { EVENT, PUB_SUB_DEFAULT_RETRY_CODES } from '../helpers/constants'
import { GCloudPubSubServerOptions } from '../interfaces/gcloud-pub-sub.interface'

const RETRY_INTERVAL = 5000

export class GCloudPubSubServer
extends Server
implements CustomTransportStrategy
{
export class GCloudPubSubServer extends Server implements CustomTransportStrategy {
public client: PubSub = null
public subscriptions: Subscription[] = []
public isShuttingDown: boolean = false
Expand All @@ -28,15 +25,15 @@ export class GCloudPubSubServer
)
const handleMessage = this.handleMessageFactory(subcriptionName)
const handleError = this.handleErrorFactory(subscription, subcriptionName)
subscription.on(MESSAGE, handleMessage.bind(this))
subscription.on(ERROR, handleError)
subscription.on(EVENT.MESSAGE, handleMessage.bind(this))
subscription.on(EVENT.ERROR, handleError.bind(this))
this.subscriptions.push(subscription)
})
callback()
}

public handleErrorFactory(subscription: Subscription, subcriptionName: string) {
return (error): void => {
const handleError = (error: { code: number } & string): void => {
this.handleError(error)
if (!this.isShuttingDown && PUB_SUB_DEFAULT_RETRY_CODES.includes(error.code)) {
this.logger.warn(`Closing subscription: ${subcriptionName}`)
Expand All @@ -47,24 +44,24 @@ export class GCloudPubSubServer
}, RETRY_INTERVAL)
}
}
return handleError
}

public close() {
this.isShuttingDown = true
this.subscriptions.forEach((subscription) => {
subscription.close()
})
this.subscriptions.forEach((subscription) => subscription.close())
}

public handleMessageFactory(subscriptionName: string) {
return async (message: Message) => {
const handler = this.getHandlerByPattern(subscriptionName)
const handler = this.getHandlerByPattern(subscriptionName)
const handleMessage = async (message: Message) => {
if (!handler) {
this.logger.warn(`ack message with no active handler: ${message.id}`)
message.ack()
return
}
await handler(message)
}
return handleMessage
}
}
5 changes: 3 additions & 2 deletions src/module/gcloud-pub-sub.module.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { DynamicModule, Module, Provider } from '@nestjs/common'
import { PublishOptions } from '@google-cloud/pubsub/build/src/topic'
import { Module } from '@nestjs/common'
import type { DynamicModule, Provider } from '@nestjs/common'
import type { PublishOptions } from '@google-cloud/pubsub/build/src/topic'
import { GcloudPubSubService } from './gcloud-pub-sub.service'
import { PUB_SUB_DEFAULT_BACKOFF_SETTINGS, PUB_SUB_DEFAULT_RETRY_CODES } from '../helpers/constants'
import {
Expand Down
46 changes: 24 additions & 22 deletions src/module/gcloud-pub-sub.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import { Test, TestingModule } from '@nestjs/testing'
import { Test } from '@nestjs/testing'
import type { TestingModule } from '@nestjs/testing'

import { GcloudPubSubService } from './gcloud-pub-sub.service'
import { mockGoogleAuthOptions, mockPublishOptions } from '../helpers/testHelpers'

Expand Down Expand Up @@ -38,99 +40,99 @@ describe('GcloudPubSubService', () => {
const data = 'You Tried Your Best and You Failed Miserably. The Lesson Is Never Try'
const gcloudPubSubLibMock = {
topic: jest.fn().mockReturnThis(),
publish: jest.fn((buffer) => {
expect(buffer).toMatchSnapshot()
publishMessage: jest.fn((buffer) => {
expect(buffer.data).toMatchSnapshot()
}),
}

service.gcloudPubSubLib = gcloudPubSubLibMock as any
service.publishMessage(topic, data)
expect(gcloudPubSubLibMock.publish).toHaveBeenCalled()
expect(gcloudPubSubLibMock.publishMessage).toHaveBeenCalled()
})
it('handles Buffer as data', () => {
const topic = 'Homer'
const data = 'You Tried Your Best and You Failed Miserably. The Lesson Is Never Try'
const gcloudPubSubLibMock = {
topic: jest.fn().mockReturnThis(),
publish: jest.fn((buffer) => {
expect(buffer).toMatchSnapshot()
publishMessage: jest.fn((buffer) => {
expect(buffer.data).toMatchSnapshot()
}),
}

service.gcloudPubSubLib = gcloudPubSubLibMock as any
service.publishMessage(topic, Buffer.from(data))
expect(gcloudPubSubLibMock.publish).toHaveBeenCalled()
expect(gcloudPubSubLibMock.publishMessage).toHaveBeenCalled()
})
it('handles an array of numbers as data', () => {
const topic = 'Homer'
const data = [10, 20, 30, 40, 50]
const gcloudPubSubLibMock = {
topic: jest.fn().mockReturnThis(),
publish: jest.fn((buffer) => {
expect(buffer).toMatchSnapshot()
publishMessage: jest.fn((buffer) => {
expect(buffer.data).toMatchSnapshot()
}),
}

service.gcloudPubSubLib = gcloudPubSubLibMock as any
service.publishMessage(topic, data)
expect(gcloudPubSubLibMock.publish).toHaveBeenCalled()
expect(gcloudPubSubLibMock.publishMessage).toHaveBeenCalled()
})
it('handles an ArrayBuffer as data', () => {
const topic = 'Homer'
const data = new ArrayBuffer(1)
const gcloudPubSubLibMock = {
topic: jest.fn().mockReturnThis(),
publish: jest.fn((buffer) => {
expect(buffer).toMatchSnapshot()
publishMessage: jest.fn((buffer) => {
expect(buffer.data).toMatchSnapshot()
}),
}

service.gcloudPubSubLib = gcloudPubSubLibMock as any
service.publishMessage(topic, data)
expect(gcloudPubSubLibMock.publish).toHaveBeenCalled()
expect(gcloudPubSubLibMock.publishMessage).toHaveBeenCalled()
})
it('handles a SharedArrayBuffer as data', () => {
const topic = 'Homer'
const data = new SharedArrayBuffer(1)
const gcloudPubSubLibMock = {
topic: jest.fn().mockReturnThis(),
publish: jest.fn((buffer) => {
expect(buffer).toMatchSnapshot()
publishMessage: jest.fn((buffer) => {
expect(buffer.data).toMatchSnapshot()
}),
}

service.gcloudPubSubLib = gcloudPubSubLibMock as any
service.publishMessage(topic, data)
expect(gcloudPubSubLibMock.publish).toHaveBeenCalled()
expect(gcloudPubSubLibMock.publishMessage).toHaveBeenCalled()
})
it('handles a Uint8Array as data', () => {
const topic = 'Homer'
const data = new Uint8Array([1, 2, 3])
const gcloudPubSubLibMock = {
topic: jest.fn().mockReturnThis(),
publish: jest.fn((buffer) => {
expect(buffer).toMatchSnapshot()
publishMessage: jest.fn((buffer) => {
expect(buffer.data).toMatchSnapshot()
}),
}

service.gcloudPubSubLib = gcloudPubSubLibMock as any
service.publishMessage(topic, data)
expect(gcloudPubSubLibMock.publish).toHaveBeenCalled()
expect(gcloudPubSubLibMock.publishMessage).toHaveBeenCalled()
})
it('handles a string and binary encoding', () => {
const topic = 'Homer'
const data = 'You Tried Your Best and You Failed Miserably. The Lesson Is Never Try'
const encoding = 'binary'
const gcloudPubSubLibMock = {
topic: jest.fn().mockReturnThis(),
publish: jest.fn((buffer) => {
expect(buffer).toMatchSnapshot()
publishMessage: jest.fn((buffer) => {
expect(buffer.data).toMatchSnapshot()
}),
}

service.gcloudPubSubLib = gcloudPubSubLibMock as any
service.publishMessage(topic, data, {}, encoding)
expect(gcloudPubSubLibMock.publish).toHaveBeenCalled()
expect(gcloudPubSubLibMock.publishMessage).toHaveBeenCalled()
})
})
})
Expand Down
17 changes: 13 additions & 4 deletions src/module/gcloud-pub-sub.service.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { Injectable } from '@nestjs/common'
import { PubSub } from '@google-cloud/pubsub'
import { GoogleAuthOptions } from '../interfaces/gcloud-pub-sub.interface'
import { PublishOptions } from '@google-cloud/pubsub/build/src/topic'
import type { PublishOptions } from '@google-cloud/pubsub/build/src/topic'

import type { GoogleAuthOptions } from '../interfaces/gcloud-pub-sub.interface'

@Injectable()
export class GcloudPubSubService {
Expand All @@ -15,7 +16,7 @@ export class GcloudPubSubService {
}

public publishMessage(
topic: string,
topicName: string,
data: string | Uint8Array | number[] | ArrayBuffer | SharedArrayBuffer,
attributes: { [key: string]: string } = {},
encoding?: BufferEncoding
Expand All @@ -34,6 +35,14 @@ export class GcloudPubSubService {
} else {
dataBuffer = Buffer.from(data as string)
}
return this.gcloudPubSubLib.topic(topic, this.publishOptions).publish(dataBuffer, attributes)

const messageOptions = {
data: dataBuffer,
attributes: attributes,
}

const topic = this.gcloudPubSubLib.topic(topicName, this.publishOptions)

return topic.publishMessage(messageOptions)
}
}

0 comments on commit 17c808b

Please sign in to comment.