Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve suspend/destroy #161

Merged
merged 3 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -474,16 +474,22 @@ module.exports = class Hyperswarm extends EventEmitter {
async suspend () {
if (this.suspended) return

const promises = []

promises.push(this.server.suspend())

for (const discovery of this._discovery.values()) {
discovery.suspend()
promises.push(discovery.suspend())
}

for (const connection of this._allConnections) {
connection.destroy()
}

await this.dht.suspend()
promises.push(this.dht.suspend())
this.suspended = true

await Promise.allSettled(promises)
}

async resume () {
Expand Down
50 changes: 35 additions & 15 deletions lib/peer-discovery.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const safetyCatch = require('safety-catch')
const b4a = require('b4a')

const REFRESH_INTERVAL = 1000 * 60 * 10 // 10 min
const RANDOM_JITTER = 1000 * 60 * 2 // 2 min
Expand Down Expand Up @@ -56,15 +57,20 @@ module.exports = class PeerDiscovery {
}, delay)
}

_isActive () {
return !this.destroyed && !this.suspended
}

// TODO: Allow announce to be an argument to this
// TODO: Maybe announce should be a setter?
async _refresh () {
if (this.suspended) return
const clock = ++this._refreshes

if (this._wait) {
await this._wait
this._wait = null
if (clock !== this._refreshes) return
if (clock !== this._refreshes || !this._isActive()) return
}

const clear = this.isServer && this._firstAnnounce
Expand All @@ -78,7 +84,7 @@ module.exports = class PeerDiscovery {
if (this.isServer) {
await this.swarm.listen()
// if a parallel refresh is happening, yield to the new one
if (clock !== this._refreshes) return
if (clock !== this._refreshes || !this._isActive()) return
this._needsUnannounce = true
}

Expand All @@ -91,15 +97,17 @@ module.exports = class PeerDiscovery {

try {
for await (const data of this._activeQuery) {
if (!this.isClient) continue
if (!this.isClient || !this._isActive()) continue
for (const peer of data.peers) {
this._onpeer(peer, data)
}
}
} catch (err) {
if (this._isActive()) throw err
} finally {
if (this._activeQuery === query) {
this._activeQuery = null
if (!this.destroyed) this._refreshLater(false)
if (!this.destroyed && !this.suspended) this._refreshLater(false)
}
}

Expand Down Expand Up @@ -168,10 +176,7 @@ module.exports = class PeerDiscovery {
return this.destroying
}

async _destroy () {
if (this.destroyed) return
this.destroyed = true

async _abort () {
if (this._wait) await this._wait

if (this._activeQuery) {
Expand All @@ -183,6 +188,8 @@ module.exports = class PeerDiscovery {
this._timer = null
}

const nodes = this._closestNodes

if (this._currentRefresh) {
try {
await this._currentRefresh
Expand All @@ -191,20 +198,33 @@ module.exports = class PeerDiscovery {
}
}

if (this._isActive()) return

if (this._closestNodes !== nodes) {
for (const node of this._closestNodes) {
for (const n of nodes) {
if (!n.id || !node.id || b4a.equals(n.id, node.id)) continue
nodes.push(n)
}
}
}

if (this._needsUnannounce) {
await this.swarm.dht.unannounce(this.topic, this.swarm.keyPair)
await this.swarm.dht.unannounce(this.topic, this.swarm.keyPair, { closestNodes: nodes, onlyClosestNodes: true })
this._needsUnannounce = false
}
}

suspend () {
_destroy () {
if (this.destroyed) return
this.destroyed = true
return this._abort()
}

async suspend () {
if (this.suspended) return
this.suspended = true

if (this._timer) {
clearTimeout(this._timer)
this._timer = null
}
return this._abort()
}

resume () {
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"dependencies": {
"b4a": "^1.3.1",
"events": "^3.3.0",
"hyperdht": "^6.7.0",
"hyperdht": "^6.10.0",
"safety-catch": "^1.0.2",
"shuffled-priority-queue": "^2.1.0"
},
Expand Down