Skip to content

Commit

Permalink
fix: Ensure that each request can only fail once. (#28)
Browse files Browse the repository at this point in the history
On WSL with no network, or running docker without a network, the 'error'
and 'timeout' callbacks would both be called. The timeout should only
happen after the specified timeout, 5 minutes in the node SDK, but
happens immediately under either of those conditions.

This change ensures that each request can only fail one time.
  • Loading branch information
kinyoklion authored May 22, 2024
1 parent ada7031 commit bcceb35
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 62 deletions.
132 changes: 78 additions & 54 deletions example/eventsource-polyfill.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,27 +67,27 @@
/* 0 */
/***/ (function(module, exports) {

var g;

// This works in non-strict mode
g = (function() {
return this;
})();

try {
// This works if eval is allowed (see CSP)
g = g || Function("return this")() || (1,eval)("this");
} catch(e) {
// This works if the window reference is available
if(typeof window === "object")
g = window;
}

// g can still be undefined, but nothing to do about it...
// We return undefined, instead of nothing here, so it's
// easier to handle this case. if(!global) { ...}

module.exports = g;
var g;

// This works in non-strict mode
g = (function() {
return this;
})();

try {
// This works if eval is allowed (see CSP)
g = g || Function("return this")() || (1,eval)("this");
} catch(e) {
// This works if the window reference is available
if(typeof window === "object")
g = window;
}

// g can still be undefined, but nothing to do about it...
// We return undefined, instead of nothing here, so it's
// easier to handle this case. if(!global) { ...}

module.exports = g;


/***/ }),
Expand Down Expand Up @@ -4675,7 +4675,7 @@ var IncomingMessage = exports.IncomingMessage = function (xhr, response, mode, f
self.url = response.url
self.statusCode = response.status
self.statusMessage = response.statusText

response.headers.forEach(function (header, key){
self.headers[key.toLowerCase()] = header
self.rawHeaders.push(key, header)
Expand Down Expand Up @@ -4805,7 +4805,7 @@ IncomingMessage.prototype._onXHRProgress = function () {
self.push(new Buffer(response))
break
}
// Falls through in IE8
// Falls through in IE8
case 'text':
try { // This will fail when readyState = 3 in IE9. Switch mode and wait for readyState = 4
response = xhr.responseText
Expand Down Expand Up @@ -7264,6 +7264,19 @@ function hasBom (buf) {
})
}

/**
* Wrap a callback to ensure it can only be called once.
*/
function once(cb) {
let called = false
return (...params) => {
if(!called) {
called = true
cb(...params)
}
}
}

/**
* Creates a new EventSource object
*
Expand Down Expand Up @@ -7423,17 +7436,25 @@ function EventSource (url, eventSourceInitDict) {
}, delay)
}

function destroyRequest() {
if (req.destroy) req.destroy()
if (req.xhr && req.xhr.abort) req.xhr.abort()
}

function connect () {
var urlAndOptions = makeRequestUrlAndOptions()
var isSecure = urlAndOptions.options.protocol === 'https:' ||
(urlAndOptions.url && urlAndOptions.url.startsWith('https:'))

// Each request should be able to fail at most once.
const failOnce = once(failed)

var callback = function (res) {
// Handle HTTP redirects
if (res.statusCode === 301 || res.statusCode === 307) {
if (!res.headers.location) {
// Server sent redirect response without Location header.
failed({ status: res.statusCode, message: res.statusMessage })
failOnce({ status: res.statusCode, message: res.statusMessage })
return
}
if (res.statusCode === 307) reconnectUrl = url
Expand All @@ -7444,7 +7465,7 @@ function EventSource (url, eventSourceInitDict) {

// Handle HTTP errors
if (res.statusCode !== 200) {
failed({ status: res.statusCode, message: res.statusMessage })
failOnce({ status: res.statusCode, message: res.statusMessage })
return
}

Expand All @@ -7456,13 +7477,13 @@ function EventSource (url, eventSourceInitDict) {
res.on('close', function () {
res.removeAllListeners('close')
res.removeAllListeners('end')
failed()
failOnce()
})

res.on('end', function () {
res.removeAllListeners('close')
res.removeAllListeners('end')
failed()
failOnce()
})
_emit(new Event('open'))

Expand Down Expand Up @@ -7561,12 +7582,14 @@ function EventSource (url, eventSourceInitDict) {
}

req.on('error', function (err) {
failed({ message: err.message })
failOnce({ message: err.message })
})

req.on('timeout', function () {
failed({ message: 'Read timeout, received no data in ' + config.readTimeoutMillis +
failOnce({ message: 'Read timeout, received no data in ' + config.readTimeoutMillis +
'ms, assuming connection is dead' })
// Timeout doesn't mean that the request is cancelled, just that it has elapsed the timeout.
destroyRequest()
})

if (req.setNoDelay) req.setNoDelay(true)
Expand All @@ -7584,8 +7607,9 @@ function EventSource (url, eventSourceInitDict) {
this._close = function () {
if (readyState === EventSource.CLOSED) return
readyState = EventSource.CLOSED
if (req.abort) req.abort()
if (req.xhr && req.xhr.abort) req.xhr.abort()

destroyRequest()

_emit(new Event('closed'))
}

Expand Down Expand Up @@ -8711,28 +8735,28 @@ module.exports = CalculateCapacity
/* 33 */
/***/ (function(module, exports) {

module.exports = function(module) {
if(!module.webpackPolyfill) {
module.deprecate = function() {};
module.paths = [];
// module.parent = undefined by default
if(!module.children) module.children = [];
Object.defineProperty(module, "loaded", {
enumerable: true,
get: function() {
return module.l;
}
});
Object.defineProperty(module, "id", {
enumerable: true,
get: function() {
return module.i;
}
});
module.webpackPolyfill = 1;
}
return module;
};
module.exports = function(module) {
if(!module.webpackPolyfill) {
module.deprecate = function() {};
module.paths = [];
// module.parent = undefined by default
if(!module.children) module.children = [];
Object.defineProperty(module, "loaded", {
enumerable: true,
get: function() {
return module.l;
}
});
Object.defineProperty(module, "id", {
enumerable: true,
get: function() {
return module.i;
}
});
module.webpackPolyfill = 1;
}
return module;
};


/***/ }),
Expand Down Expand Up @@ -12056,4 +12080,4 @@ module.exports = function isBuffer(arg) {
}

/***/ })
/******/ ]);
/******/ ]);
40 changes: 32 additions & 8 deletions lib/eventsource.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,19 @@ function hasBom (buf) {
})
}

/**
* Wrap a callback to ensure it can only be called once.
*/
function once(cb) {
let called = false
return (...params) => {
if(!called) {
called = true
cb(...params)
}
}
}

/**
* Creates a new EventSource object
*
Expand Down Expand Up @@ -186,17 +199,25 @@ function EventSource (url, eventSourceInitDict) {
}, delay)
}

function destroyRequest() {
if (req.destroy) req.destroy()
if (req.xhr && req.xhr.abort) req.xhr.abort()
}

function connect () {
var urlAndOptions = makeRequestUrlAndOptions()
var isSecure = urlAndOptions.options.protocol === 'https:' ||
(urlAndOptions.url && urlAndOptions.url.startsWith('https:'))

// Each request should be able to fail at most once.
const failOnce = once(failed)

var callback = function (res) {
// Handle HTTP redirects
if (res.statusCode === 301 || res.statusCode === 307) {
if (!res.headers.location) {
// Server sent redirect response without Location header.
failed({ status: res.statusCode, message: res.statusMessage })
failOnce({ status: res.statusCode, message: res.statusMessage })
return
}
if (res.statusCode === 307) reconnectUrl = url
Expand All @@ -207,7 +228,7 @@ function EventSource (url, eventSourceInitDict) {

// Handle HTTP errors
if (res.statusCode !== 200) {
failed({ status: res.statusCode, message: res.statusMessage })
failOnce({ status: res.statusCode, message: res.statusMessage })
return
}

Expand All @@ -219,13 +240,13 @@ function EventSource (url, eventSourceInitDict) {
res.on('close', function () {
res.removeAllListeners('close')
res.removeAllListeners('end')
failed()
failOnce()
})

res.on('end', function () {
res.removeAllListeners('close')
res.removeAllListeners('end')
failed()
failOnce()
})
_emit(new Event('open'))

Expand Down Expand Up @@ -324,12 +345,14 @@ function EventSource (url, eventSourceInitDict) {
}

req.on('error', function (err) {
failed({ message: err.message })
failOnce({ message: err.message })
})

req.on('timeout', function () {
failed({ message: 'Read timeout, received no data in ' + config.readTimeoutMillis +
failOnce({ message: 'Read timeout, received no data in ' + config.readTimeoutMillis +
'ms, assuming connection is dead' })
// Timeout doesn't mean that the request is cancelled, just that it has elapsed the timeout.
destroyRequest()
})

if (req.setNoDelay) req.setNoDelay(true)
Expand All @@ -347,8 +370,9 @@ function EventSource (url, eventSourceInitDict) {
this._close = function () {
if (readyState === EventSource.CLOSED) return
readyState = EventSource.CLOSED
if (req.abort) req.abort()
if (req.xhr && req.xhr.abort) req.xhr.abort()

destroyRequest()

_emit(new Event('closed'))
}

Expand Down

0 comments on commit bcceb35

Please sign in to comment.