Skip to content

Commit

Permalink
feat: streaming poc (#679)
Browse files Browse the repository at this point in the history
Co-authored-by: Nuno Góis <[email protected]>
  • Loading branch information
kwasniew and nunogois authored Dec 12, 2024
1 parent 4115c39 commit 9a0fd22
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 4 deletions.
14 changes: 14 additions & 0 deletions examples/streaming.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
const { Unleash } = require('../lib');

const client = new Unleash({
appName: 'my-application',
url: 'https://app.unleash-hosted.com/demo/api/',
customHeaders: {
Authorization: '943ca9171e2c884c545c5d82417a655fb77cec970cc3b78a8ff87f4406b495d0',
},
experimentalMode: { type: 'streaming' },
skipInstanceCountWarning: true,
});
client.on('changed', () => {
console.log(client.isEnabled('demo001', { userId: `${Math.random()}` }));
});
4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "unleash-client",
"version": "6.2.0",
"version": "6.3.0-alpha.0",
"description": "Unleash Client for Node",
"license": "Apache-2.0",
"main": "./lib/index.js",
Expand Down Expand Up @@ -33,6 +33,7 @@
"http-proxy-agent": "^7.0.2",
"https-proxy-agent": "^7.0.5",
"ip-address": "^9.0.5",
"launchdarkly-eventsource": "2.0.3",
"make-fetch-happen": "^13.0.1",
"murmurhash3js": "^3.0.1",
"proxy-from-env": "^1.1.0",
Expand All @@ -50,6 +51,7 @@
"@ava/babel": "^2.0.0",
"@ava/typescript": "^4.0.0",
"@tsconfig/node12": "^12.0.0",
"@types/eventsource": "^1.1.15",
"@types/express": "^4.17.17",
"@types/jsbn": "^1.2.33",
"@types/make-fetch-happen": "^10.0.4",
Expand Down
2 changes: 1 addition & 1 deletion src/details.json
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{"name":"unleash-client-node","version":"6.2.0","sdkVersion":"unleash-client-node:6.2.0"}
{"name":"unleash-client-node","version":"6.3.0-alpha.0","sdkVersion":"unleash-client-node:6.3.0-alpha.0"}
31 changes: 30 additions & 1 deletion src/repository/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import {
Segment,
StrategyTransportInterface,
} from '../strategy/strategy';
// @ts-expect-error
import { EventSource } from 'launchdarkly-eventsource';

export const SUPPORTED_SPEC_VERSION = '4.3.0';

Expand All @@ -39,6 +41,7 @@ export interface RepositoryOptions {
bootstrapProvider: BootstrapProvider;
bootstrapOverride?: boolean;
storageProvider: StorageProvider<ClientFeaturesResponse>;
eventSource?: EventSource;
}

interface FeatureToggleData {
Expand Down Expand Up @@ -90,6 +93,8 @@ export default class Repository extends EventEmitter implements EventEmitter {

private segments: Map<number, Segment>;

private eventSource: EventSource | undefined;

constructor({
url,
appName,
Expand All @@ -105,6 +110,7 @@ export default class Repository extends EventEmitter implements EventEmitter {
bootstrapProvider,
bootstrapOverride = true,
storageProvider,
eventSource,
}: RepositoryOptions) {
super();
this.url = url;
Expand All @@ -122,10 +128,30 @@ export default class Repository extends EventEmitter implements EventEmitter {
this.bootstrapOverride = bootstrapOverride;
this.storageProvider = storageProvider;
this.segments = new Map();
this.eventSource = eventSource;
if (this.eventSource) {
this.eventSource.addEventListener('unleash-updated', (event: { data: string }) => {
try {
const data: ClientFeaturesResponse & { meta: { etag: string } } = JSON.parse(event.data);
const etag = data.meta.etag;
if (etag !== null) {
this.etag = etag;
} else {
this.etag = undefined;
}
this.save(data, true);
} catch (err) {
this.emit(UnleashEvents.Error, err);
}
});
this.eventSource.addEventListener('error', (error: unknown) => {
this.emit(UnleashEvents.Warn, error);
});
}
}

timedFetch(interval: number) {
if (interval > 0) {
if (interval > 0 && !this.eventSource) {
this.timer = setTimeout(() => this.fetch(), interval);
if (process.env.NODE_ENV !== 'test' && typeof this.timer.unref === 'function') {
this.timer.unref();
Expand Down Expand Up @@ -398,6 +424,9 @@ Message: ${err.message}`,
clearTimeout(this.timer);
}
this.removeAllListeners();
if (this.eventSource) {
this.eventSource.close();
}
}

getSegment(segmentId: number): Segment | undefined {
Expand Down
71 changes: 71 additions & 0 deletions src/test/repository.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import Repository from '../repository';
import { DefaultBootstrapProvider } from '../repository/bootstrap-provider';
import { StorageProvider } from '../repository/storage-provider';
import { ClientFeaturesResponse } from '../feature';
import { EventEmitter } from 'events';

const appName = 'foo';
const instanceId = 'bar';
Expand Down Expand Up @@ -1360,3 +1361,73 @@ test('Stopping repository should stop storage provider updates', async (t) => {
const result = await storageProvider.get(appName);
t.is(result, undefined);
});

test('Streaming', async (t) => {
t.plan(6);
const url = 'irrelevant';
const feature = {
name: 'feature',
enabled: true,
strategies: [
{
name: 'default',
},
],
};
const storageProvider: StorageProvider<ClientFeaturesResponse> = new InMemStorageProvider();
const eventSource = {
eventEmitter: new EventEmitter(),
listeners: new Set<string>(),
addEventListener(eventName: string, handler: () => void) {
eventSource.listeners.add(eventName);
eventSource.eventEmitter.on(eventName, handler);
},
close() {
eventSource.listeners.forEach((eventName) => {
eventSource.eventEmitter.removeAllListeners(eventName);
});
},
emit(eventName: string, data: unknown) {
eventSource.eventEmitter.emit(eventName, data);
},
};
const repo = new Repository({
url,
appName,
instanceId,
refreshInterval: 10,
// @ts-expect-error
bootstrapProvider: new DefaultBootstrapProvider({}),
storageProvider,
eventSource,
});

const before = repo.getToggles();
t.deepEqual(before, []);

// update with feature
eventSource.emit('unleash-updated', {
type: 'unleash-updated',
data: JSON.stringify({ meta: {}, features: [feature] }),
});
const firstUpdate = repo.getToggles();
t.deepEqual(firstUpdate, [feature]);
// @ts-expect-error
t.is(repo.etag, undefined);

// update with etag
eventSource.emit('unleash-updated', {
type: 'unleash-updated',
data: JSON.stringify({ meta: { etag: 'updated' }, features: [] }),
});
const secondUpdate = repo.getToggles();
t.deepEqual(secondUpdate, []);
// @ts-expect-error
t.is(repo.etag, 'updated');

// SSE error translated to repo warning
repo.on('warn', (msg) => {
t.is(msg, 'some error');
});
eventSource.emit('error', 'some error');
});
3 changes: 3 additions & 0 deletions src/unleash-config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import { BootstrapOptions } from './repository/bootstrap-provider';
import { StorageProvider } from './repository/storage-provider';
import { RepositoryInterface } from './repository';

export type Mode = { type: 'polling' } | { type: 'streaming' };

export interface UnleashConfig {
appName: string;
environment?: string;
Expand All @@ -32,4 +34,5 @@ export interface UnleashConfig {
storageProvider?: StorageProvider<ClientFeaturesResponse>;
disableAutoStart?: boolean;
skipInstanceCountWarning?: boolean;
experimentalMode?: Mode;
}
16 changes: 15 additions & 1 deletion src/unleash.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import { resolveBootstrapProvider } from './repository/bootstrap-provider';
import { ImpressionEvent, UnleashEvents } from './events';
import { UnleashConfig } from './unleash-config';
import FileStorageProvider from './repository/storage-provider-file';

import { resolveUrl } from './url-utils';
// @ts-expect-error
import { EventSource } from 'launchdarkly-eventsource';
export { Strategy, UnleashEvents, UnleashConfig };

const BACKUP_PATH: string = tmpdir();
Expand Down Expand Up @@ -73,6 +75,7 @@ export class Unleash extends EventEmitter {
storageProvider,
disableAutoStart = false,
skipInstanceCountWarning = false,
experimentalMode = { type: 'polling' },
}: UnleashConfig) {
super();

Expand Down Expand Up @@ -123,6 +126,17 @@ export class Unleash extends EventEmitter {
tags,
bootstrapProvider,
bootstrapOverride,
eventSource:
experimentalMode?.type === 'streaming'
? new EventSource(resolveUrl(unleashUrl, './client/streaming'), {
headers: customHeaders,
readTimeoutMillis: 60000, // start a new SSE connection when no heartbeat received in 1 minute
initialRetryDelayMillis: 2000,
maxBackoffMillis: 30000,
retryResetIntervalMillis: 60000,
jitterRatio: 0.5,
})
: undefined,
storageProvider: storageProvider || new FileStorageProvider(backupPath),
});

Expand Down
10 changes: 10 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,11 @@
dependencies:
"@types/node" "*"

"@types/eventsource@^1.1.15":
version "1.1.15"
resolved "https://registry.yarnpkg.com/@types/eventsource/-/eventsource-1.1.15.tgz#949383d3482e20557cbecbf3b038368d94b6be27"
integrity sha512-XQmGcbnxUNa06HR3VBVkc9+A2Vpi9ZyLJcdS5dwaQQ/4ZMWFO+5c90FnMUpbtMZwB/FChoYHwuVg8TvkECacTA==

"@types/express-serve-static-core@^4.17.33":
version "4.17.41"
resolved "https://registry.yarnpkg.com/@types/express-serve-static-core/-/express-serve-static-core-4.17.41.tgz#5077defa630c2e8d28aa9ffc2c01c157c305bef6"
Expand Down Expand Up @@ -3311,6 +3316,11 @@ kind-of@^6.0.3:
resolved "https://registry.yarnpkg.com/kind-of/-/kind-of-6.0.3.tgz#07c05034a6c349fa06e24fa35aa76db4580ce4dd"
integrity sha512-dcS1ul+9tmeD95T+x28/ehLgd9mENa3LsvDTtzm3vyBEO7RPptvAD+t44WVXaUjTBRcrpFeFlC8WCruUR456hw==

[email protected]:
version "2.0.3"
resolved "https://registry.yarnpkg.com/launchdarkly-eventsource/-/launchdarkly-eventsource-2.0.3.tgz#8a7b8da5538153f438f7d452b1c87643d900f984"
integrity sha512-VhFjppK7jXlcEKaS7bxdoibB5j01NKyeDR7a8XfssdDGNWCTsbF0/5IExSmPi44eDncPhkoPNxlSZhEZvrbD5w==

lcov-parse@^1.0.0:
version "1.0.0"
resolved "https://registry.npmjs.org/lcov-parse/-/lcov-parse-1.0.0.tgz"
Expand Down

0 comments on commit 9a0fd22

Please sign in to comment.