Skip to content

Commit

Permalink
refactor: move to @boringnode/bus (#14)
Browse files Browse the repository at this point in the history
  • Loading branch information
Julien-R44 authored Apr 7, 2024
1 parent bb4ea66 commit ff32759
Show file tree
Hide file tree
Showing 23 changed files with 169 additions and 893 deletions.
52 changes: 0 additions & 52 deletions packages/bentocache/benchmarks/encoders.ts

This file was deleted.

4 changes: 2 additions & 2 deletions packages/bentocache/factories/cache_factory.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import lodash from '@poppinss/utils/lodash'
import { getActiveTest } from '@japa/runner'
import { MemoryTransport } from '@boringnode/bus/transports/memory'

import { Cache } from '../src/cache/cache.js'
import { RedisDriver } from '../src/drivers/redis.js'
import { MemoryDriver } from '../src/drivers/memory.js'
import { MemoryBus } from '../src/bus/drivers/memory_bus.js'
import type { CacheStackDrivers } from '../src/types/main.js'
import { CacheStack } from '../src/cache/stack/cache_stack.js'
import { BentoCacheOptions } from '../src/bento_cache_options.js'
Expand Down Expand Up @@ -80,7 +80,7 @@ export class CacheFactory {
withL1L2Config() {
this.#parameters.l1Driver ??= new MemoryDriver({ maxSize: 100, prefix: 'test' })
this.#parameters.l2Driver ??= new RedisDriver({ connection: { host: '127.0.0.1', port: 6379 } })
this.#parameters.busDriver ??= new MemoryBus()
this.#parameters.busDriver ??= new MemoryTransport()

return this
}
Expand Down
2 changes: 1 addition & 1 deletion packages/bentocache/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
}
},
"dependencies": {
"@paralleldrive/cuid2": "^2.2.2",
"@boringnode/bus": "^0.5.0",
"@poppinss/utils": "^6.7.3",
"async-mutex": "^0.5.0",
"chunkify": "^5.0.0",
Expand Down
130 changes: 22 additions & 108 deletions packages/bentocache/src/bus/bus.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { createId } from '@paralleldrive/cuid2'
import { Bus as RlanzBus } from '@boringnode/bus'
import type { Transport } from '@boringnode/bus/types/main'

import { RetryQueue } from './retry_queue.js'
import { CacheBusMessageType } from '../types/bus.js'
import type { LocalCache } from '../cache/facades/local_cache.js'
import { BusMessageReceived } from '../events/bus/bus_message_received.js'
import { BusMessagePublished } from '../events/bus/bus_message_published.js'
import type { BusDriver, BusOptions, CacheBusMessage, Emitter, Logger } from '../types/main.js'
import type { BusOptions, CacheBusMessage, Emitter, Logger } from '../types/main.js'

/**
* The bus is used to notify other processes about cache changes.
Expand All @@ -17,94 +17,42 @@ import type { BusDriver, BusOptions, CacheBusMessage, Emitter, Logger } from '..
* local cache accordingly.
*/
export class Bus {
/**
* The underlying bus driver
*/
#driver: BusDriver

/**
* The local cache that will be updated when a message is received
*/
#cache?: LocalCache

/**
* The logger to use
*/
#bus: RlanzBus
#logger: Logger

/**
* Emitter
*/
#emitter: Emitter

/**
* A unique identifier for this bus instance
* that is used to prevent the bus from
* emitting events to itself
*/
#busId = createId()

/**
* The channel name to use
*/
#cache?: LocalCache
#channelName = 'bentocache.notifications'

/**
* The error retry queue holds messages that failed to be sent
*/
#errorRetryQueue = new RetryQueue()

constructor(
driver: BusDriver,
driver: Transport,
cache: LocalCache,
logger: Logger,
emitter: Emitter,
options: BusOptions = {},
) {
this.#driver = driver
this.#cache = cache
this.#emitter = emitter
this.#logger = logger.child({ context: 'bentocache.bus' })
this.#errorRetryQueue = new RetryQueue(options.retryQueue?.enabled, options.retryQueue?.maxSize)

driver
.setId(this.#busId)
.setLogger(this.#logger)
.onReconnect(() => this.#onReconnect())
}

/**
* Process the error retry queue
*/
async #processErrorRetryQueue() {
this.#logger.debug(
`starting error retry queue processing with ${this.#errorRetryQueue.size()} messages`,
)

await this.#errorRetryQueue.process(async (message) => {
await this.publish(message)
return true
this.#bus = new RlanzBus(driver, {
retryQueue: {
...options.retryQueue,
removeDuplicates: true,
retryInterval: options.retryQueue?.retryInterval || false,
},
})

this.#bus.subscribe<CacheBusMessage>(this.#channelName, this.#onMessage.bind(this))
}

/**
* When a message is received through the bus.
* This is where we update the local cache.
*/
async #onMessage(message: CacheBusMessage) {
/**
* Since we received a message from the bus, we assume that
* the Bus is working. So we can try process the error retry queue if
* there are any messages in it.
*/
await this.#processErrorRetryQueue()

this.#logger.trace({ keys: message.keys, type: message.type }, 'received message from bus')
this.#emitter.emit('bus:message:received', new BusMessageReceived(message))

/**
* Process the message
*/
if (message.type === CacheBusMessageType.Delete) {
for (const key of message.keys) this.#cache?.delete(key)
}
Expand All @@ -118,60 +66,26 @@ export class Bus {
}
}

/**
* When the bus driver reconnects after a disconnection
*/
async #onReconnect() {
this.#logger.debug('bus driver reconnected')
await this.#processErrorRetryQueue()
}

/**
* Subscribe to the bus channel
*/
async subscribe() {
this.#driver.subscribe(this.#channelName, this.#onMessage.bind(this))
}

/**
* Publish a message to the bus channel
*
* @returns true if the message was published, false if not
*/
async publish(message: Omit<CacheBusMessage, 'busId'>): Promise<boolean> {
const fullMessage = { ...message, busId: this.#busId }

try {
this.#logger.trace({ keys: message.keys, type: message.type }, 'publishing message to bus')

/**
* Publish the message to the bus using the underlying driver
*/
await this.#driver.publish(this.#channelName, fullMessage)

/**
* Emit the bus:message:published event
*/
this.#emitter.emit('bus:message:published', new BusMessagePublished(fullMessage))
async publish(message: CacheBusMessage): Promise<boolean> {
const wasPublished = await this.#bus.publish(this.#channelName, message)
if (wasPublished) {
this.#emitter.emit('bus:message:published', new BusMessagePublished(message))
return true
} catch (error) {
this.#logger.error(error, 'failed to publish message to bus')

/**
* Add to the error retry queue
*/
const wasAdded = this.#errorRetryQueue.enqueue(fullMessage)
if (!wasAdded) return false

this.#logger.debug(message, 'added message to error retry queue')
return false
}

this.#logger.error('failed to publish message to bus')
return false
}

/**
* Disconnect the bus
*/
async disconnect(): Promise<void> {
this.#driver.disconnect()
await this.#bus.disconnect()
}
}
92 changes: 0 additions & 92 deletions packages/bentocache/src/bus/drivers/memory_bus.ts

This file was deleted.

Loading

0 comments on commit ff32759

Please sign in to comment.