Skip to content

Commit

Permalink
feat(core): subscriptions closed promise can now resolve to void or…
Browse files Browse the repository at this point in the history
… Error - if error the subscription was closed due to an error. Note that it doesn't reject, but rather resolves to the Error. (#146)

Signed-off-by: Alberto Ricart <[email protected]>
  • Loading branch information
aricart authored Nov 19, 2024
1 parent d0dfb1a commit b8bb419
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 10 deletions.
9 changes: 7 additions & 2 deletions core/src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -579,8 +579,13 @@ export function syncIterator<T>(src: AsyncIterable<T>): SyncIterator<T> {
* Basic interface to a Subscription type
*/
export interface Subscription extends AsyncIterable<Msg> {
/** A promise that resolves when the subscription closes */
closed: Promise<void>;
/**
* A promise that resolves when the subscription closes. If the promise
* resolves to an error, the subscription was closed because of an error
* typically a permissions error. Note that this promise doesn't reject, but
* rather resolves to void (no error) or an Error
*/
closed: Promise<void | Error>;

/**
* Stop the subscription from receiving messages. You can optionally
Expand Down
14 changes: 7 additions & 7 deletions core/src/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ export class SubscriptionImpl extends QueuedIteratorImpl<Msg>
timer?: Timeout<void>;
info?: unknown;
cleanupFn?: (sub: Subscription, info?: unknown) => void;
closed: Deferred<void>;
closed: Deferred<void | Error>;
requestSubject?: string;
slow?: SlowNotifier;

Expand All @@ -156,7 +156,7 @@ export class SubscriptionImpl extends QueuedIteratorImpl<Msg>
this.subject = subject;
this.draining = false;
this.noIterator = typeof opts.callback === "function";
this.closed = deferred();
this.closed = deferred<void | Error>();

const asyncTraces = !(protocol.options?.noAsyncTraces || false);

Expand All @@ -178,8 +178,8 @@ export class SubscriptionImpl extends QueuedIteratorImpl<Msg>
if (!this.noIterator) {
// cleanup - they used break or return from the iterator
// make sure we clean up, if they didn't call unsub
this.iterClosed.then(() => {
this.closed.resolve();
this.iterClosed.then((err: void | Error) => {
this.closed.resolve(err);
this.unsubscribe();
});
}
Expand All @@ -203,7 +203,7 @@ export class SubscriptionImpl extends QueuedIteratorImpl<Msg>
}
}

close(): void {
close(err?: Error): void {
if (!this.isClosed()) {
this.cancelTimeout();
const fn = () => {
Expand All @@ -215,7 +215,7 @@ export class SubscriptionImpl extends QueuedIteratorImpl<Msg>
// ignoring
}
}
this.closed.resolve();
this.closed.resolve(err);
};

if (this.noIterator) {
Expand Down Expand Up @@ -350,7 +350,7 @@ export class Subscriptions {
}
if (sub) {
sub.callback(err, {} as Msg);
sub.close();
sub.close(err);
this.subs.delete(sub.sid);
return sub !== this.mux;
}
Expand Down
8 changes: 7 additions & 1 deletion core/tests/auth_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,9 @@ Deno.test("auth - perm sub iterator error", async () => {
`Permissions Violation for Subscription to "q"`,
);

const err = await sub.closed;
assertEquals(err instanceof errors.PermissionViolationError, true);

await cleanup(ns, nc);
});

Expand All @@ -967,7 +970,7 @@ Deno.test("auth - perm error is not in lastError", async () => {
assertEquals(nci.protocol.lastError, undefined);

const d = deferred<Error | null>();
nc.subscribe("q", {
const sub = nc.subscribe("q", {
callback: (err) => {
d.resolve(err);
},
Expand All @@ -978,6 +981,9 @@ Deno.test("auth - perm error is not in lastError", async () => {
assert(err instanceof errors.PermissionViolationError);
assert(nci.protocol.lastError === undefined);

const err2 = await sub.closed;
assert(err2 instanceof errors.PermissionViolationError);

await cleanup(ns, nc);
});

Expand Down
2 changes: 2 additions & 0 deletions migration.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ these modules for cross-runtime consumption.
data and are easier to use from different modules, since you can provide the
string name of the type. For more information see
[Lifecycle and Informational and Events](core/README.md#lifecycle-and-informational-events)
- Subscription#closed now resolves to void or an Error (it doesn't throw). The
error is the reason why the subscription closed.

## Changes in JetStream

Expand Down

0 comments on commit b8bb419

Please sign in to comment.