Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: move to @boringnode/bus #14

Merged
merged 10 commits into from
Apr 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 0 additions & 52 deletions packages/bentocache/benchmarks/encoders.ts

This file was deleted.

4 changes: 2 additions & 2 deletions packages/bentocache/factories/cache_factory.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import lodash from '@poppinss/utils/lodash'
import { getActiveTest } from '@japa/runner'
import { MemoryTransport } from '@boringnode/bus/transports/memory'

import { Cache } from '../src/cache/cache.js'
import { RedisDriver } from '../src/drivers/redis.js'
import { MemoryDriver } from '../src/drivers/memory.js'
import { MemoryBus } from '../src/bus/drivers/memory_bus.js'
import type { CacheStackDrivers } from '../src/types/main.js'
import { CacheStack } from '../src/cache/stack/cache_stack.js'
import { BentoCacheOptions } from '../src/bento_cache_options.js'
Expand Down Expand Up @@ -80,7 +80,7 @@ export class CacheFactory {
withL1L2Config() {
this.#parameters.l1Driver ??= new MemoryDriver({ maxSize: 100, prefix: 'test' })
this.#parameters.l2Driver ??= new RedisDriver({ connection: { host: '127.0.0.1', port: 6379 } })
this.#parameters.busDriver ??= new MemoryBus()
this.#parameters.busDriver ??= new MemoryTransport()

return this
}
Expand Down
2 changes: 1 addition & 1 deletion packages/bentocache/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
}
},
"dependencies": {
"@paralleldrive/cuid2": "^2.2.2",
"@boringnode/bus": "^0.5.0",
"@poppinss/utils": "^6.7.3",
"async-mutex": "^0.5.0",
"chunkify": "^5.0.0",
Expand Down
130 changes: 22 additions & 108 deletions packages/bentocache/src/bus/bus.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { createId } from '@paralleldrive/cuid2'
import { Bus as RlanzBus } from '@boringnode/bus'
import type { Transport } from '@boringnode/bus/types/main'

import { RetryQueue } from './retry_queue.js'
import { CacheBusMessageType } from '../types/bus.js'
import type { LocalCache } from '../cache/facades/local_cache.js'
import { BusMessageReceived } from '../events/bus/bus_message_received.js'
import { BusMessagePublished } from '../events/bus/bus_message_published.js'
import type { BusDriver, BusOptions, CacheBusMessage, Emitter, Logger } from '../types/main.js'
import type { BusOptions, CacheBusMessage, Emitter, Logger } from '../types/main.js'

/**
* The bus is used to notify other processes about cache changes.
Expand All @@ -17,94 +17,42 @@ import type { BusDriver, BusOptions, CacheBusMessage, Emitter, Logger } from '..
* local cache accordingly.
*/
export class Bus {
/**
* The underlying bus driver
*/
#driver: BusDriver

/**
* The local cache that will be updated when a message is received
*/
#cache?: LocalCache

/**
* The logger to use
*/
#bus: RlanzBus
#logger: Logger

/**
* Emitter
*/
#emitter: Emitter

/**
* A unique identifier for this bus instance
* that is used to prevent the bus from
* emitting events to itself
*/
#busId = createId()

/**
* The channel name to use
*/
#cache?: LocalCache
#channelName = 'bentocache.notifications'

/**
* The error retry queue holds messages that failed to be sent
*/
#errorRetryQueue = new RetryQueue()

constructor(
driver: BusDriver,
driver: Transport,
cache: LocalCache,
logger: Logger,
emitter: Emitter,
options: BusOptions = {},
) {
this.#driver = driver
this.#cache = cache
this.#emitter = emitter
this.#logger = logger.child({ context: 'bentocache.bus' })
this.#errorRetryQueue = new RetryQueue(options.retryQueue?.enabled, options.retryQueue?.maxSize)

driver
.setId(this.#busId)
.setLogger(this.#logger)
.onReconnect(() => this.#onReconnect())
}

/**
* Process the error retry queue
*/
async #processErrorRetryQueue() {
this.#logger.debug(
`starting error retry queue processing with ${this.#errorRetryQueue.size()} messages`,
)

await this.#errorRetryQueue.process(async (message) => {
await this.publish(message)
return true
this.#bus = new RlanzBus(driver, {
retryQueue: {
...options.retryQueue,
removeDuplicates: true,
retryInterval: options.retryQueue?.retryInterval || false,
},
})

this.#bus.subscribe<CacheBusMessage>(this.#channelName, this.#onMessage.bind(this))
}

/**
* When a message is received through the bus.
* This is where we update the local cache.
*/
async #onMessage(message: CacheBusMessage) {
/**
* Since we received a message from the bus, we assume that
* the Bus is working. So we can try process the error retry queue if
* there are any messages in it.
*/
await this.#processErrorRetryQueue()

this.#logger.trace({ keys: message.keys, type: message.type }, 'received message from bus')
this.#emitter.emit('bus:message:received', new BusMessageReceived(message))

/**
* Process the message
*/
if (message.type === CacheBusMessageType.Delete) {
for (const key of message.keys) this.#cache?.delete(key)
}
Expand All @@ -118,60 +66,26 @@ export class Bus {
}
}

/**
* When the bus driver reconnects after a disconnection
*/
async #onReconnect() {
this.#logger.debug('bus driver reconnected')
await this.#processErrorRetryQueue()
}

/**
* Subscribe to the bus channel
*/
async subscribe() {
this.#driver.subscribe(this.#channelName, this.#onMessage.bind(this))
}

/**
* Publish a message to the bus channel
*
* @returns true if the message was published, false if not
*/
async publish(message: Omit<CacheBusMessage, 'busId'>): Promise<boolean> {
const fullMessage = { ...message, busId: this.#busId }

try {
this.#logger.trace({ keys: message.keys, type: message.type }, 'publishing message to bus')

/**
* Publish the message to the bus using the underlying driver
*/
await this.#driver.publish(this.#channelName, fullMessage)

/**
* Emit the bus:message:published event
*/
this.#emitter.emit('bus:message:published', new BusMessagePublished(fullMessage))
async publish(message: CacheBusMessage): Promise<boolean> {
const wasPublished = await this.#bus.publish(this.#channelName, message)
if (wasPublished) {
this.#emitter.emit('bus:message:published', new BusMessagePublished(message))
return true
} catch (error) {
this.#logger.error(error, 'failed to publish message to bus')

/**
* Add to the error retry queue
*/
const wasAdded = this.#errorRetryQueue.enqueue(fullMessage)
if (!wasAdded) return false

this.#logger.debug(message, 'added message to error retry queue')
return false
}

this.#logger.error('failed to publish message to bus')
return false
}

/**
* Disconnect the bus
*/
async disconnect(): Promise<void> {
this.#driver.disconnect()
await this.#bus.disconnect()
}
}
92 changes: 0 additions & 92 deletions packages/bentocache/src/bus/drivers/memory_bus.ts

This file was deleted.

Loading