Skip to content
This repository has been archived by the owner on Oct 11, 2023. It is now read-only.

Commit

Permalink
Merge pull request #136 from amarzavery/neweph
Browse files Browse the repository at this point in the history
While handling disconnects, make more attempts if the error is retryable
  • Loading branch information
amarzavery authored Aug 30, 2018
2 parents e7dd9a5 + 950e822 commit b0413d6
Show file tree
Hide file tree
Showing 10 changed files with 202 additions and 21 deletions.
6 changes: 6 additions & 0 deletions client/changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
### 2018-08-29 0.2.7
- Improved logging statements to the connection context.
- Added timeout to promisifed creation/closing of rhea sender, receiver, session, connection.
- Fixed a bug in the EventData deserialization logic by checking for `!= undefined` check rather than the `!` check.
- While handling disconnects we retry for 150 times at an interval of 15 seconds as long the error is `retryable`.

### 2018-08-07 0.2.6
- Improved log statements.
- Documented different mechanisms of getting the debug logs in [README](https://github.com/Azure/azure-event-hubs-node/tree/master/client#debug-logs).
Expand Down
5 changes: 3 additions & 2 deletions client/lib/amqp-common/retry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import { translate, MessagingError } from "./errors";
import { delay } from ".";
import * as log from "./log";
import { defaultRetryAttempts, defaultDelayBetweenRetriesInSeconds } from "./util/constants";

/**
* Determines whether the object is a Delivery object.
Expand Down Expand Up @@ -100,8 +101,8 @@ function validateRetryConfig<T>(config: RetryConfig<T>): void {
*/
export async function retry<T>(config: RetryConfig<T>): Promise<T> {
validateRetryConfig(config);
if (!config.times) config.times = 3;
if (!config.delayInSeconds) config.delayInSeconds = 15;
if (config.times == undefined) config.times = defaultRetryAttempts;
if (config.delayInSeconds == undefined) config.delayInSeconds = defaultDelayBetweenRetriesInSeconds;
let lastError: MessagingError | undefined;
let result: any;
let success = false;
Expand Down
4 changes: 3 additions & 1 deletion client/lib/amqp-common/util/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,6 @@ export const maxAbsoluteExpiryTime = new Date("9999-12-31T07:59:59.000Z").getTim
export const aadTokenValidityMarginSeconds = 5;
export const connectionReconnectDelay = 300;
export const defaultRetryAttempts = 3;
export const defaultDelayBetweenRetriesInSeconds = 5;
export const defaultConnectionRetryAttempts = 150;
export const defaultDelayBetweenOperationRetriesInSeconds = 5;
export const defaultDelayBetweenRetriesInSeconds = 15;
2 changes: 1 addition & 1 deletion client/lib/amqp-common/util/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ export function getNewAsyncLock(options?: AsyncLockOptions): AsyncLock {
/**
* @constant {AsyncLock} defaultLock The async lock instance with default settings.
*/
export const defaultLock: AsyncLock = new AsyncLock();
export const defaultLock: AsyncLock = new AsyncLock({ maxPending: 10000 });

/**
* Describes a Timeout class that can wait for the specified amount of time and then resolve/reject
Expand Down
13 changes: 8 additions & 5 deletions client/lib/eventHubReceiver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import { EventData } from "./eventData";
import { ReceiveOptions } from "./eventHubClient";
import { ConnectionContext } from "./connectionContext";
import { LinkEntity } from "./linkEntity";
import { EventPosition } from './eventPosition';
import { EventPosition } from "./eventPosition";

interface CreateReceiverOptions {
onMessage: OnAmqpEvent;
Expand Down Expand Up @@ -224,7 +224,7 @@ export class EventHubReceiver extends LinkEntity {
const ehError = translate(receiverError);
log.error("[%s] An error occurred for Receiver '%s': %O.",
this._context.connectionId, this.name, ehError);
if (this._receiver && this._receiver.isClosed()) {
if (this._receiver && !this._receiver.isClosed()) {
this._onError!(ehError);
}
}
Expand All @@ -236,7 +236,7 @@ export class EventHubReceiver extends LinkEntity {
const ehError = translate(sessionError);
log.error("[%s] An error occurred on the session for Receiver '%s': %O.",
this._context.connectionId, this.name, ehError);
if (this._receiver && this._receiver.isSessionClosed()) {
if (this._receiver && !this._receiver.isSessionClosed()) {
this._onError!(ehError);
}
}
Expand Down Expand Up @@ -355,11 +355,14 @@ export class EventHubReceiver extends LinkEntity {
rcvrOptions.eventPosition = EventPosition.fromSequenceNumber(this._checkpoint.sequenceNumber);
}
const options: ReceiverOptions = this._createReceiverOptions(rcvrOptions);
// shall retry 3 times at an interval of 15 seconds and bail out.
// shall retry forever at an interval of 15 seconds if the error is a retryable error
// else bail out when the error is not retryable or the oepration succeeds.
const config: RetryConfig<void> = {
operation: () => this._init(options),
connectionId: this._context.connectionId,
operationType: RetryOperationType.receiverLink
operationType: RetryOperationType.receiverLink,
times: Constants.defaultConnectionRetryAttempts,
delayInSeconds: 15
};
await retry<void>(config);
}
Expand Down
9 changes: 6 additions & 3 deletions client/lib/eventHubSender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,11 +196,14 @@ export class EventHubSender extends LinkEntity {
const options: SenderOptions = this._createSenderOptions({
newName: true
});
// shall retry 3 times at an interval of 15 seconds and bail out.
// shall retry forever at an interval of 15 seconds if the error is a retryable error
// else bail out when the error is not retryable or the oepration succeeds.
const config: RetryConfig<void> = {
operation: () => this._init(options),
connectionId: this._context.connectionId,
operationType: RetryOperationType.senderLink
operationType: RetryOperationType.senderLink,
times: Constants.defaultConnectionRetryAttempts,
delayInSeconds: 15
};
return retry<void>(config);
});
Expand Down Expand Up @@ -458,7 +461,7 @@ export class EventHubSender extends LinkEntity {
connectionId: this._context.connectionId,
operationType: RetryOperationType.sendMessage,
times: Constants.defaultRetryAttempts,
delayInSeconds: Constants.defaultDelayBetweenRetriesInSeconds + jitterInSeconds
delayInSeconds: Constants.defaultDelayBetweenOperationRetriesInSeconds + jitterInSeconds
};
return retry<Delivery>(config);
}
Expand Down
8 changes: 4 additions & 4 deletions client/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions client/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "azure-event-hubs",
"version": "0.2.6",
"version": "0.2.7",
"description": "Azure Event Hubs SDK for Node.js",
"author": "Microsoft Corporation",
"license": "MIT",
Expand All @@ -13,7 +13,7 @@
"jssha": "^2.3.1",
"ms-rest": "^2.3.6",
"ms-rest-azure": "^2.5.7",
"rhea": "^0.2.20",
"rhea": "^0.3.1",
"tslib": "^1.9.3",
"uuid": "^3.3.2"
},
Expand Down
2 changes: 1 addition & 1 deletion client/tests/eventdata.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ const testMessage: Message = {
const testEventData = EventData.fromAmqpMessage(testMessage);
const messageFromED = EventData.toAmqpMessage(testEventData);

describe.only("EventData", function () {
describe("EventData", function () {
describe("fromAmqpMessage", function () {
it("populates annotations with the message annotations", function () {
testEventData.annotations!.should.equal(testAnnotations);
Expand Down
170 changes: 168 additions & 2 deletions client/tests/retry.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,15 @@
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

import "mocha";
import { retry, translate, RetryConfig, RetryOperationType } from "../lib/amqp-common/";
import { retry, translate, RetryConfig, RetryOperationType, Constants } from "../lib/amqp-common/";
import * as chai from "chai";
import { delay, MessagingError } from "../lib";
import * as debugModule from "debug";
const debug = debugModule("azure:event-hubs:retry-spec");
const should = chai.should();

describe("retry function", function () {

it("should succeed if the operation succeeds.", async function () {
let counter = 0;
try {
Expand Down Expand Up @@ -146,7 +147,11 @@ describe("retry function", function () {
e.retryable = true;
throw e;
} else {
throw new Error("I would like to fail.");
const x: any = {
condition: "com.microsoft:message-lock-lost",
description: "I would like to fail."
}
throw x;
}
},
connectionId: "connection-1",
Expand Down Expand Up @@ -187,4 +192,165 @@ describe("retry function", function () {
counter.should.equal(4);
}
});

describe("with config.times set to Infinity", function () {
it("should succeed if the operation succeeds.", async function () {
let counter = 0;
try {
const config: RetryConfig<any> = {
times: Infinity,
operation: async () => {
debug("counter: %d", ++counter);
await delay(200);
return {
code: 200,
description: "OK"
}
},
connectionId: "connection-1",
operationType: RetryOperationType.cbsAuth
};
const result = await retry(config);
result.code.should.equal(200);
result.description.should.equal("OK");
counter.should.equal(1);
} catch (err) {
debug("An error occurred in a test that should have succeeded: %O", err);
throw err;
}
});

it("should fail if the operation returns a non retryable error", async function () {
let counter = 0;
try {
const config: RetryConfig<any> = {
operation: async () => {
debug("counter: %d", ++counter);
await delay(200);
throw translate({
condition: "amqp:precondition-failed",
description: "I would like to fail, not retryable."
});
},
connectionId: "connection-1",
operationType: RetryOperationType.management,
times: Infinity
};
await retry(config);
} catch (err) {
should.exist(err);
should.equal(true, err instanceof MessagingError);
err.message.should.equal("I would like to fail, not retryable.");
counter.should.equal(1);
}
});

it("should succeed if the operation initially fails with a retryable error and then succeeds.", async function () {
let counter = 0;
try {
const config: RetryConfig<any> = {
operation: async () => {
await delay(200);
debug("counter: %d", ++counter);
if (counter == 1) {
throw translate({
condition: "com.microsoft:server-busy",
description: "The server is busy right now. Retry later."
});
} else {
return {
code: 200,
description: "OK"
};
}
},
connectionId: "connection-1",
operationType: RetryOperationType.receiverLink,
times: Infinity,
delayInSeconds: 0.5
};
const result = await retry(config);
result.code.should.equal(200);
result.description.should.equal("OK");
counter.should.equal(2);
} catch (err) {
debug("An error occurred in a test that should have succeeded: %O", err);
throw err;
}
});

it("should succeed in the last attempt.", async function () {
let counter = 0;
try {
const config: RetryConfig<any> = {
operation: async () => {
await delay(200);
debug("counter: %d", ++counter);
if (counter == 1) {
const e = new MessagingError("A retryable error.");
e.retryable = true;
throw e;
} else if (counter == 2) {
const e = new MessagingError("A retryable error.");
e.retryable = true;
throw e;
} else {
return {
code: 200,
description: "OK"
};
}
},
connectionId: "connection-1",
operationType: RetryOperationType.senderLink,
times: Infinity,
delayInSeconds: 0.5
};
const result = await retry(config);
result.code.should.equal(200);
result.description.should.equal("OK");
counter.should.equal(3);
} catch (err) {
debug("An error occurred in a test that should have succeeded: %O", err);
throw err;
}
});

it("should fail if the last attempt return a non-retryable error", async function () {
let counter = 0;
try {
const config: RetryConfig<any> = {
operation: async () => {
await delay(200);
debug("counter: %d", ++counter);
if (counter == 1) {
const e = new MessagingError("A retryable error.");
e.retryable = true;
throw e;
} else if (counter == 2) {
const e = new MessagingError("A retryable error.");
e.retryable = true;
throw e;
} else {
const x: any = {
condition: "com.microsoft:message-lock-lost",
description: "I would like to fail."
}
throw x;
}
},
connectionId: "connection-1",
operationType: RetryOperationType.sendMessage,
times: Constants.defaultConnectionRetryAttempts,
delayInSeconds: 0.0001
};
await retry(config);
} catch (err) {
should.exist(err);
should.equal(true, err instanceof MessagingError);
err.message.should.equal("I would like to fail.");
counter.should.equal(3);
}
});
});
});

0 comments on commit b0413d6

Please sign in to comment.