Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

balance rdy #406

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 51 additions & 39 deletions lib/readerrdy.js
Original file line number Diff line number Diff line change
Expand Up @@ -424,15 +424,13 @@ class ReaderRdy extends NodeState {
* @param {ConnectionRdy} connectionRdy
*/
onMessageSuccess(connectionRdy) {
if (!this.isPaused()) {
if (this.isLowRdy()) {
// Balance the RDY count amoung existing connections given the
// low RDY condition.
this.balance()
} else {
// Restore RDY count for connection to the connection max.
connectionRdy.bump()
}
if (this.isLowRdy()) {
// Balance the RDY count amoung existing connections given the
// low RDY condition.
this.raise('balance')
} else {
// Restore RDY count for connection to the connection max.
connectionRdy.bump()
743v45 marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -446,7 +444,6 @@ class ReaderRdy extends NodeState {

conn.on(NSQDConnection.CLOSED, () => {
this.removeConnection(connectionRdy)
this.balance()
})

conn.on(NSQDConnection.FINISHED, () => this.raise('success', connectionRdy))
Expand All @@ -471,8 +468,8 @@ class ReaderRdy extends NodeState {

this.connections.push(connectionRdy)
this.roundRobinConnections.add(connectionRdy)
this.raise('balance')

this.balance()
if (this.current_state_name === 'ZERO') {
this.goto('MAX')
} else if (['TRY_ONE', 'MAX'].includes(this.current_state_name)) {
Expand All @@ -481,6 +478,8 @@ class ReaderRdy extends NodeState {
})
}



/**
* Remove a connection from the pool.
*
Expand All @@ -489,6 +488,7 @@ class ReaderRdy extends NodeState {
removeConnection(conn) {
this.connections.splice(this.connections.indexOf(conn), 1)
this.roundRobinConnections.remove(conn)
this.raise('balance')

if (this.connections.length === 0) {
this.goto('ZERO')
Expand All @@ -504,18 +504,11 @@ class ReaderRdy extends NodeState {
return this.connections.map((conn) => conn.bump())
}

/**
* Try to balance the connection pool.
*/
try() {
this.balance()
}

/**
* Raise a `BACKOFF` event for each connection in the pool.
*/
backoff() {
this.connections.forEach((conn) => conn.backoff())
this.backoffAllConn()

if (this.backoffId) {
clearTimeout(this.backoffId)
Expand Down Expand Up @@ -551,6 +544,7 @@ class ReaderRdy extends NodeState {
case 'TRY_ONE':
return 1
case 'PAUSE':
case 'ZERO':
return 0
default:
return this.maxInFlight
Expand All @@ -569,38 +563,33 @@ class ReaderRdy extends NodeState {
* the RDY count is distributed to the next waiting connection. If
* the connection does nothing with it's RDY count, then it should
* timeout and give it's RDY count to another connection.
* @param {Number} maxConnectionsRdy
*/
balance() {
balance(maxConnectionsRdy) {
this.log('balance')
clearTimeout(this.balanceId)
this.balanceId = null;
743v45 marked this conversation as resolved.
Show resolved Hide resolved
if (!this.connections.length) return;

if (this.balanceId != null) {
clearTimeout(this.balanceId)
this.balanceId = null
}

const max = this.maxConnectionsRdy()
const perConnectionMax = Math.floor(max / this.connections.length)
const perConnectionMax = Math.floor(maxConnectionsRdy / this.connections.length)

// Low RDY and try conditions
if (perConnectionMax === 0) {
/**
* Backoff on all connections. In-flight messages from
* connections will still be processed.
*/
this.connections.forEach((conn) => conn.backoff())
this.backoffAllConn()

const nextConnCount = Math.max(maxConnectionsRdy - this.inFlight(), 0);
// Distribute available RDY count to the connections next in line.
this.roundRobinConnections.next(max - this.inFlight()).forEach((conn) => {
this.roundRobinConnections.next(nextConnCount).forEach((conn) => {
conn.setConnectionRdyMax(1)
conn.bump()
})

// Rebalance periodically. Needed when no messages are received.
this.balanceId = setTimeout(() => {
this.balance()
this.raise('balance')
}, this.lowRdyTimeout)
} else {
let rdyRemainder = this.maxInFlight % this.connections.length
let rdyRemainder = maxConnectionsRdy % this.connections.length
this.connections.forEach((c) => {
let connMax = perConnectionMax

Expand All @@ -618,6 +607,20 @@ class ReaderRdy extends NodeState {
})
}
}

backoffAllConn() {
/**
* for ZERO state
*/
if (!this.connections) {
return;
}
/**
* Backoff on all connections. In-flight messages from
* connections will still be processed.
*/
this.connections.forEach((conn) => conn.backoff())
}
}

/**
Expand All @@ -631,13 +634,15 @@ class ReaderRdy extends NodeState {
ReaderRdy.prototype.states = {
ZERO: {
Enter() {
this.backoffAllConn()
if (this.backoffId) {
return clearTimeout(this.backoffId)
}
},
backoff() {}, // No-op
success() {}, // No-op
try() {}, // No-op
balance() {}, // No-op
pause() {
// No-op
return this.goto('PAUSE')
Expand All @@ -647,11 +652,12 @@ ReaderRdy.prototype.states = {

PAUSE: {
Enter() {
return this.connections.map((conn) => conn.backoff())
this.backoffAllConn()
},
backoff() {}, // No-op
success() {}, // No-op
try() {}, // No-op
balance() {}, // No-op
pause() {}, // No-op
unpause() {
return this.goto('TRY_ONE')
Expand All @@ -660,7 +666,7 @@ ReaderRdy.prototype.states = {

TRY_ONE: {
Enter() {
return this.try()
return this.raise('balance')
},
backoff() {
return this.goto('BACKOFF')
Expand All @@ -671,6 +677,9 @@ ReaderRdy.prototype.states = {
return this.goto('MAX')
},
try() {}, // No-op
balance() {
this.balance(1);
},
pause() {
return this.goto('PAUSE')
},
Expand All @@ -679,8 +688,7 @@ ReaderRdy.prototype.states = {

MAX: {
Enter() {
this.balance()
return this.bump()
this.raise('balance')
},
backoff() {
return this.goto('BACKOFF')
Expand All @@ -690,6 +698,9 @@ ReaderRdy.prototype.states = {
return this.onMessageSuccess(connectionRdy)
},
try() {}, // No-op
balance() {
this.balance(this.maxInFlight);
},
pause() {
return this.goto('PAUSE')
},
Expand All @@ -706,6 +717,7 @@ ReaderRdy.prototype.states = {
try() {
return this.goto('TRY_ONE')
},
balance() {}, // No-op
pause() {
return this.goto('PAUSE')
},
Expand Down