From d7f88a8f968b4fc86b7c6c555597715416b3cf4c Mon Sep 17 00:00:00 2001 From: Gautam K Date: Tue, 1 Oct 2024 13:40:07 +0530 Subject: [PATCH 1/6] Fix for bus synchronization with namespace --- packages/bentocache/package.json | 3 ++- packages/bentocache/src/bus/bus.ts | 6 ++++- .../bentocache/src/cache/stack/cache_stack.ts | 26 +++++++++++------- packages/bentocache/src/types/bus.ts | 3 ++- packages/bentocache/tests/bus/bus.spec.ts | 27 +++++++++++++++++++ 5 files changed, 52 insertions(+), 13 deletions(-) diff --git a/packages/bentocache/package.json b/packages/bentocache/package.json index b8b1ac8..0738cee 100644 --- a/packages/bentocache/package.json +++ b/packages/bentocache/package.json @@ -1,7 +1,7 @@ { "name": "bentocache", "type": "module", - "version": "1.0.0-beta.9", + "version": "1.0.0-beta.10", "description": "Multi-tier cache module for Node.js. Redis, Upstash, CloudfareKV, File, in-memory and others drivers", "author": "Julien Ripouteau ", "license": "MIT", @@ -75,6 +75,7 @@ "@boringnode/bus": "^0.6.0", "@poppinss/utils": "^6.7.3", "async-mutex": "^0.5.0", + "bentocache": "file:", "chunkify": "^5.0.0", "hexoid": "^1.0.0", "lru-cache": "^10.2.2", diff --git a/packages/bentocache/src/bus/bus.ts b/packages/bentocache/src/bus/bus.ts index 185d980..770d9d6 100644 --- a/packages/bentocache/src/bus/bus.ts +++ b/packages/bentocache/src/bus/bus.ts @@ -2,6 +2,7 @@ import { Bus as RlanzBus } from '@boringnode/bus' import type { Transport } from '@boringnode/bus/types/main' import { CacheBusMessageType } from '../types/bus.js' +import { BaseDriver } from '../drivers/base_driver.js' import type { LocalCache } from '../cache/facades/local_cache.js' import { BusMessageReceived } from '../events/bus/bus_message_received.js' import { BusMessagePublished } from '../events/bus/bus_message_published.js' @@ -16,7 +17,7 @@ import type { BusOptions, CacheBusMessage, Emitter, Logger } from '../types/main * the same channel and will receive the message and update their * local cache accordingly. */ -export class Bus { +export class Bus extends BaseDriver { #bus: RlanzBus #logger: Logger #emitter: Emitter @@ -30,6 +31,7 @@ export class Bus { emitter: Emitter, options: BusOptions = {}, ) { + super(options) this.#cache = cache this.#emitter = emitter this.#logger = logger.child({ context: 'bentocache.bus' }) @@ -72,6 +74,8 @@ export class Bus { * @returns true if the message was published, false if not */ async publish(message: CacheBusMessage): Promise { + // Namespace all the keys before publishing + message.keys = message.keys.map((key) => this.getItemKey(key)) const wasPublished = await this.#bus.publish(this.#channelName, message) if (wasPublished) { this.#emitter.emit('bus:message:published', new BusMessagePublished(message)) diff --git a/packages/bentocache/src/cache/stack/cache_stack.ts b/packages/bentocache/src/cache/stack/cache_stack.ts index dd16dc9..848d7d9 100644 --- a/packages/bentocache/src/cache/stack/cache_stack.ts +++ b/packages/bentocache/src/cache/stack/cache_stack.ts @@ -20,6 +20,8 @@ export class CacheStack { l1?: LocalCache l2?: RemoteCache bus?: Bus + #busDriver?: BusDriver + #busOptions?: BusOptions defaultOptions: CacheEntryOptions logger: Logger @@ -46,22 +48,26 @@ export class CacheStack { if (bus) return bus if (!busDriver || !this.l1) return - const opts = lodash.merge({ retryQueue: { enabled: true, maxSize: undefined } }, busOptions) - const newBus = new Bus(busDriver, this.l1, this.logger, this.emitter, opts) + this.#busDriver = busDriver + this.#busOptions = lodash.merge( + { retryQueue: { enabled: true, maxSize: undefined } }, + busOptions, + ) + const newBus = new Bus(this.#busDriver, this.l1, this.logger, this.emitter, this.#busOptions) return newBus } namespace(namespace: string) { - return new CacheStack( - this.name, - this.options, - { - l1Driver: this.l1?.namespace(namespace), - l2Driver: this.l2?.namespace(namespace), + return new CacheStack(this.name, this.options, { + l1Driver: this.l1?.namespace(namespace), + l2Driver: this.l2?.namespace(namespace), + busDriver: this.#busDriver, + busOptions: { + ...this.#busOptions, + prefix: namespace, }, - this.bus, - ) + }) } emit(event: CacheEvent) { diff --git a/packages/bentocache/src/types/bus.ts b/packages/bentocache/src/types/bus.ts index de505ca..106c8c1 100644 --- a/packages/bentocache/src/types/bus.ts +++ b/packages/bentocache/src/types/bus.ts @@ -1,6 +1,7 @@ import type { Transport } from '@boringnode/bus/types/main' import type { Duration } from './helpers.js' +import type { DriverCommonOptions } from './main.js' /** * Interface for the bus driver @@ -53,4 +54,4 @@ export type BusOptions = { */ retryInterval?: Duration | false } -} +} & DriverCommonOptions diff --git a/packages/bentocache/tests/bus/bus.spec.ts b/packages/bentocache/tests/bus/bus.spec.ts index c12c089..75b5af1 100644 --- a/packages/bentocache/tests/bus/bus.spec.ts +++ b/packages/bentocache/tests/bus/bus.spec.ts @@ -33,6 +33,33 @@ test.group('Bus synchronization', () => { assert.isUndefined(await cache3.get(key)) }).disableTimeout() + test('synchronize multiple cache with namespace', async ({ assert }) => { + const key = 'foo' + + const [f1] = new CacheFactory().withL1L2Config().create() + const [f2] = new CacheFactory().withL1L2Config().create() + const [f3] = new CacheFactory().withL1L2Config().create() + + const cache1 = f1.namespace('users') + const cache2 = f2.namespace('users') + const cache3 = f3.namespace('users') + + await cache1.set(key, 24) + await setTimeout(100) + + assert.equal(await cache1.get(key), 24) + assert.equal(await cache2.get(key), 24) + assert.equal(await cache3.get(key), 24) + + await cache1.delete(key) + + await setTimeout(100) + + assert.isUndefined(await cache1.get(key)) + assert.isUndefined(await cache2.get(key)) + assert.isUndefined(await cache3.get(key)) + }).disableTimeout() + test('retry queue processing', async ({ assert }) => { const bus1 = new ChaosBus(new MemoryTransport()) const bus2 = new ChaosBus(new MemoryTransport()) From 3b5747ef17f7c7b5f42026dffc5ac863d63696cb Mon Sep 17 00:00:00 2001 From: Gautam K Date: Tue, 1 Oct 2024 13:53:47 +0530 Subject: [PATCH 2/6] Modified sync namespace test case --- packages/bentocache/tests/bus/bus.spec.ts | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/packages/bentocache/tests/bus/bus.spec.ts b/packages/bentocache/tests/bus/bus.spec.ts index 75b5af1..3bc704b 100644 --- a/packages/bentocache/tests/bus/bus.spec.ts +++ b/packages/bentocache/tests/bus/bus.spec.ts @@ -42,22 +42,23 @@ test.group('Bus synchronization', () => { const cache1 = f1.namespace('users') const cache2 = f2.namespace('users') - const cache3 = f3.namespace('users') + const cache3 = f3.namespace('admin') await cache1.set(key, 24) + await cache3.set(key, 42) await setTimeout(100) assert.equal(await cache1.get(key), 24) assert.equal(await cache2.get(key), 24) - assert.equal(await cache3.get(key), 24) + assert.equal(await cache3.get(key), 42) - await cache1.delete(key) + await cache1.clear() await setTimeout(100) assert.isUndefined(await cache1.get(key)) assert.isUndefined(await cache2.get(key)) - assert.isUndefined(await cache3.get(key)) + assert.equal(await cache3.get(key), 42) }).disableTimeout() test('retry queue processing', async ({ assert }) => { From 43bb70a00aec27083fe8df06addea5999b178ced Mon Sep 17 00:00:00 2001 From: Gautam K Date: Tue, 1 Oct 2024 14:55:40 +0530 Subject: [PATCH 3/6] More complex sync namespace test case --- packages/bentocache/package.json | 3 +- packages/bentocache/tests/bus/bus.spec.ts | 35 +++++++++++++---------- 2 files changed, 21 insertions(+), 17 deletions(-) diff --git a/packages/bentocache/package.json b/packages/bentocache/package.json index 0738cee..b8b1ac8 100644 --- a/packages/bentocache/package.json +++ b/packages/bentocache/package.json @@ -1,7 +1,7 @@ { "name": "bentocache", "type": "module", - "version": "1.0.0-beta.10", + "version": "1.0.0-beta.9", "description": "Multi-tier cache module for Node.js. Redis, Upstash, CloudfareKV, File, in-memory and others drivers", "author": "Julien Ripouteau ", "license": "MIT", @@ -75,7 +75,6 @@ "@boringnode/bus": "^0.6.0", "@poppinss/utils": "^6.7.3", "async-mutex": "^0.5.0", - "bentocache": "file:", "chunkify": "^5.0.0", "hexoid": "^1.0.0", "lru-cache": "^10.2.2", diff --git a/packages/bentocache/tests/bus/bus.spec.ts b/packages/bentocache/tests/bus/bus.spec.ts index 3bc704b..b205d26 100644 --- a/packages/bentocache/tests/bus/bus.spec.ts +++ b/packages/bentocache/tests/bus/bus.spec.ts @@ -36,29 +36,34 @@ test.group('Bus synchronization', () => { test('synchronize multiple cache with namespace', async ({ assert }) => { const key = 'foo' - const [f1] = new CacheFactory().withL1L2Config().create() - const [f2] = new CacheFactory().withL1L2Config().create() - const [f3] = new CacheFactory().withL1L2Config().create() + const [cache1] = new CacheFactory().withL1L2Config().create() + const [cache2] = new CacheFactory().withL1L2Config().create() + const [cache3] = new CacheFactory().withL1L2Config().create() - const cache1 = f1.namespace('users') - const cache2 = f2.namespace('users') - const cache3 = f3.namespace('admin') + const cache1NSUsersMe = cache1.namespace('users').namespace('me') + const cache2NSUsersMe = cache2.namespace('users').namespace('me') + const cache3NSAdmin = cache3.namespace('admin') - await cache1.set(key, 24) - await cache3.set(key, 42) + await cache1NSUsersMe.set(key, 24) + await cache3NSAdmin.set(key, 42) await setTimeout(100) - assert.equal(await cache1.get(key), 24) - assert.equal(await cache2.get(key), 24) - assert.equal(await cache3.get(key), 42) + assert.equal(await cache1NSUsersMe.get(key), 24) + assert.equal(await cache2NSUsersMe.get(key), 24) + assert.equal(await cache3NSAdmin.get(key), 42) - await cache1.clear() + await cache1NSUsersMe.clear() await setTimeout(100) - assert.isUndefined(await cache1.get(key)) - assert.isUndefined(await cache2.get(key)) - assert.equal(await cache3.get(key), 42) + assert.isUndefined(await cache1NSUsersMe.get(key)) + assert.isUndefined(await cache2.namespace('users').namespace('me').get(key)) + assert.equal(await cache3NSAdmin.get(key), 42) + + await cache2.namespace('admin').clear() + await setTimeout(100) + + assert.isUndefined(await cache3NSAdmin.get(key)) }).disableTimeout() test('retry queue processing', async ({ assert }) => { From 1c7f9bbb583c979a27e64039ddc5e292462a70dc Mon Sep 17 00:00:00 2001 From: Gautam K Date: Tue, 1 Oct 2024 15:14:09 +0530 Subject: [PATCH 4/6] Removed redundant code --- packages/bentocache/src/bus/bus.ts | 6 +---- .../bentocache/src/cache/stack/cache_stack.ts | 5 +--- packages/bentocache/src/types/bus.ts | 3 +-- packages/bentocache/tests/bus/bus.spec.ts | 25 ++++++++++++++++++- 4 files changed, 27 insertions(+), 12 deletions(-) diff --git a/packages/bentocache/src/bus/bus.ts b/packages/bentocache/src/bus/bus.ts index 770d9d6..185d980 100644 --- a/packages/bentocache/src/bus/bus.ts +++ b/packages/bentocache/src/bus/bus.ts @@ -2,7 +2,6 @@ import { Bus as RlanzBus } from '@boringnode/bus' import type { Transport } from '@boringnode/bus/types/main' import { CacheBusMessageType } from '../types/bus.js' -import { BaseDriver } from '../drivers/base_driver.js' import type { LocalCache } from '../cache/facades/local_cache.js' import { BusMessageReceived } from '../events/bus/bus_message_received.js' import { BusMessagePublished } from '../events/bus/bus_message_published.js' @@ -17,7 +16,7 @@ import type { BusOptions, CacheBusMessage, Emitter, Logger } from '../types/main * the same channel and will receive the message and update their * local cache accordingly. */ -export class Bus extends BaseDriver { +export class Bus { #bus: RlanzBus #logger: Logger #emitter: Emitter @@ -31,7 +30,6 @@ export class Bus extends BaseDriver { emitter: Emitter, options: BusOptions = {}, ) { - super(options) this.#cache = cache this.#emitter = emitter this.#logger = logger.child({ context: 'bentocache.bus' }) @@ -74,8 +72,6 @@ export class Bus extends BaseDriver { * @returns true if the message was published, false if not */ async publish(message: CacheBusMessage): Promise { - // Namespace all the keys before publishing - message.keys = message.keys.map((key) => this.getItemKey(key)) const wasPublished = await this.#bus.publish(this.#channelName, message) if (wasPublished) { this.#emitter.emit('bus:message:published', new BusMessagePublished(message)) diff --git a/packages/bentocache/src/cache/stack/cache_stack.ts b/packages/bentocache/src/cache/stack/cache_stack.ts index 848d7d9..c190b31 100644 --- a/packages/bentocache/src/cache/stack/cache_stack.ts +++ b/packages/bentocache/src/cache/stack/cache_stack.ts @@ -63,10 +63,7 @@ export class CacheStack { l1Driver: this.l1?.namespace(namespace), l2Driver: this.l2?.namespace(namespace), busDriver: this.#busDriver, - busOptions: { - ...this.#busOptions, - prefix: namespace, - }, + busOptions: this.#busOptions, }) } diff --git a/packages/bentocache/src/types/bus.ts b/packages/bentocache/src/types/bus.ts index 106c8c1..de505ca 100644 --- a/packages/bentocache/src/types/bus.ts +++ b/packages/bentocache/src/types/bus.ts @@ -1,7 +1,6 @@ import type { Transport } from '@boringnode/bus/types/main' import type { Duration } from './helpers.js' -import type { DriverCommonOptions } from './main.js' /** * Interface for the bus driver @@ -54,4 +53,4 @@ export type BusOptions = { */ retryInterval?: Duration | false } -} & DriverCommonOptions +} diff --git a/packages/bentocache/tests/bus/bus.spec.ts b/packages/bentocache/tests/bus/bus.spec.ts index b205d26..0d9df72 100644 --- a/packages/bentocache/tests/bus/bus.spec.ts +++ b/packages/bentocache/tests/bus/bus.spec.ts @@ -33,13 +33,36 @@ test.group('Bus synchronization', () => { assert.isUndefined(await cache3.get(key)) }).disableTimeout() - test('synchronize multiple cache with namespace', async ({ assert }) => { + test('synchronize multiple cache with a namespace', async ({ assert }) => { const key = 'foo' const [cache1] = new CacheFactory().withL1L2Config().create() const [cache2] = new CacheFactory().withL1L2Config().create() const [cache3] = new CacheFactory().withL1L2Config().create() + await cache1.namespace('users').set(key, 24) + await setTimeout(100) + + assert.equal(await cache1.namespace('users').get(key), 24) + assert.equal(await cache2.namespace('users').get(key), 24) + assert.equal(await cache3.namespace('users').get(key), 24) + + await cache1.namespace('users').delete(key) + + await setTimeout(100) + + assert.isUndefined(await cache1.namespace('users').get(key)) + assert.isUndefined(await cache2.namespace('users').get(key)) + assert.isUndefined(await cache3.namespace('users').get(key)) + }).disableTimeout() + + test('synchronize multiple cache with multiple namespaces', async ({ assert }) => { + const key = 'bar' + + const [cache1] = new CacheFactory().withL1L2Config().create() + const [cache2] = new CacheFactory().withL1L2Config().create() + const [cache3] = new CacheFactory().withL1L2Config().create() + const cache1NSUsersMe = cache1.namespace('users').namespace('me') const cache2NSUsersMe = cache2.namespace('users').namespace('me') const cache3NSAdmin = cache3.namespace('admin') From 37f8329f2777ecb25c016408e1d524ca62aae738 Mon Sep 17 00:00:00 2001 From: Gautam K Date: Sat, 5 Oct 2024 00:06:55 +0530 Subject: [PATCH 5/6] fix: binary encoding ignored the clear message type --- .../src/bus/encoders/binary_encoder.ts | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/packages/bentocache/src/bus/encoders/binary_encoder.ts b/packages/bentocache/src/bus/encoders/binary_encoder.ts index 766b9df..d92f476 100644 --- a/packages/bentocache/src/bus/encoders/binary_encoder.ts +++ b/packages/bentocache/src/bus/encoders/binary_encoder.ts @@ -30,6 +30,18 @@ export class BinaryEncoder implements TransportEncoder { this.#busIdLength = busIdLength } + protected busMessageTypeToNum(type: CacheBusMessageType): number { + if (type === CacheBusMessageType.Set) return 0x01 + if (type === CacheBusMessageType.Clear) return 0x02 + return 0x03 + } + + protected numToBusMessageType(num: number): CacheBusMessageType { + if (num === 0x01) return CacheBusMessageType.Set + if (num === 0x02) return CacheBusMessageType.Clear + return CacheBusMessageType.Delete + } + /** * Encode the given message into a Buffer */ @@ -59,7 +71,7 @@ export class BinaryEncoder implements TransportEncoder { /** * 2. write the message type. 0x01 for 'Set' message, and 0x02 for a 'Delete' message */ - buffer.writeUInt8(payload.type === CacheBusMessageType.Set ? 0x01 : 0x02, this.#busIdLength) + buffer.writeUInt8(this.busMessageTypeToNum(payload.type), this.#busIdLength) /** * 3. Write the keys @@ -100,7 +112,7 @@ export class BinaryEncoder implements TransportEncoder { * Then comes the message type as a single byte */ const typeValue = buffer.readUInt8(offset++) - const type = typeValue === 0x01 ? CacheBusMessageType.Set : CacheBusMessageType.Delete + const type = this.numToBusMessageType(typeValue) /** * Finally, the keys From 659e4e1b2baa631d252c9fef39d5d1fea7a8d400 Mon Sep 17 00:00:00 2001 From: Gautam K Date: Sat, 5 Oct 2024 00:25:32 +0530 Subject: [PATCH 6/6] tests: added unit test to check binary encoding with the Clear message type --- packages/bentocache/tests/bus/bus.spec.ts | 31 +++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/packages/bentocache/tests/bus/bus.spec.ts b/packages/bentocache/tests/bus/bus.spec.ts index 0d9df72..977faa9 100644 --- a/packages/bentocache/tests/bus/bus.spec.ts +++ b/packages/bentocache/tests/bus/bus.spec.ts @@ -269,6 +269,37 @@ test.group('Bus synchronization', () => { }) .waitForDone() .disableTimeout() + + test('binary encoding/decoding using Clear should be fine', async ({ assert, cleanup }, done) => { + const bus1 = redisBusDriver({ connection: REDIS_CREDENTIALS }) + .factory(null as any) + .setId('foo') + + const bus2 = redisBusDriver({ connection: REDIS_CREDENTIALS }) + .factory(null as any) + .setId('bar') + + cleanup(async () => { + await bus1.disconnect() + await bus2.disconnect() + }) + + const data = { + keys: [], + type: CacheBusMessageType.Clear, + } + + bus1.subscribe('foo', (message: any) => { + assert.deepInclude(message, data) + done() + }) + + await setTimeout(200) + + await bus2.publish('foo', data) + }) + .waitForDone() + .disableTimeout() test('works with utf8 characters', async ({ assert }, done) => { const bus1 = redisBusDriver({ connection: REDIS_CREDENTIALS })