-
Notifications
You must be signed in to change notification settings - Fork 8
/
index.js
122 lines (117 loc) · 3.76 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import net from 'net';
import tls from 'tls';
import { createHash, pbkdf2 as _pbkdf2, randomFillSync } from 'crypto';
import { once } from 'events';
import { promisify } from 'util';
import { _net, SaslScramSha256 } from './mod.js';
export * from './mod.js';
const pbkdf2 = promisify(_pbkdf2);
Object.assign(SaslScramSha256.prototype, {
_b64encode(bytes) {
return Buffer.from(bytes).toString('base64');
},
_b64decode(b64) {
return Uint8Array.from(Buffer.from(b64, 'base64'));
},
_randomBytes(n) {
return randomFillSync(new Uint8Array(n));
},
async _hash(val) {
return Uint8Array.from(createHash('sha256').update(val).digest());
},
async _hi(pwd, salt, iterations) {
const buf = await pbkdf2(pwd, salt, iterations, 32, 'sha256');
return Uint8Array.from(buf);
},
});
Object.assign(_net, {
connectTcp({ host, port, keepAlive }) {
return SocketAdapter.connect({ host, port, keepAlive });
},
connectUnix({ path }) {
return SocketAdapter.connect({ path });
},
reconnectable(err) {
return ['ENOTFOUND', 'ECONNREFUSED', 'ECONNRESET'].includes(err?.code);
},
startTls(sockadapt, { hostname, caCerts }) {
return sockadapt.startTls(hostname, caCerts);
},
read(sockadapt, out) {
return sockadapt.read(out);
},
write(sockadapt, data) {
return sockadapt.write(data);
},
closeNullable(sockadapt) {
return sockadapt?.close();
},
});
class SocketAdapter {
static async connect(options) {
const socket = net.connect(options);
await once(socket, 'connect');
return new this(socket);
}
constructor(socket) {
this._readResume = Boolean; // noop
this._writeResume = Boolean; // noop
this._readPauseAsync = resolve => this._readResume = resolve;
this._writePauseAsync = resolve => this._writeResume = resolve;
this._error = null;
this._socket = socket;
this._socket.on('readable', _ => this._readResume());
this._socket.on('end', _ => this._readResume());
this._socket.on('error', error => {
this._error = error;
this._readResume();
this._writeResume();
});
}
async startTls(host, ca) {
// https://nodejs.org/docs/latest-v14.x/api/tls.html#tls_tls_connect_options_callback
const socket = this._socket;
const secureContext = tls.createSecureContext({ ca });
const tlsSocket = tls.connect({ socket, host, secureContext });
await once(tlsSocket, 'secureConnect');
// TODO check tlsSocket.authorized
// if secure connection succeeded then we take underlying socket ownership,
// otherwise underlying socket should be closed outside.
tlsSocket.on('close', _ => socket.destroy());
return new this.constructor(tlsSocket);
}
/** @param {Uint8Array} out */
async read(out) {
let buf;
for (;;) {
if (this._error) throw this._error; // TODO callstack
if (this._socket.readableEnded) return null;
const toRead = Math.min(out.length, this._socket.readableLength);
buf = this._socket.read(toRead);
if (buf?.length) break;
if (!buf) await new Promise(this._readPauseAsync);
}
if (buf.length > out.length) { // should never happen
throw Error('read more data than expected');
}
out.set(buf);
return buf.length;
}
async write(data) {
// TODO assert Uint8Array
// TODO need to copy data?
if (this._error) throw this._error; // TODO callstack
const p = new Promise(this._writePauseAsync);
this._socket.write(data, this._writeResume);
await p;
if (this._error) throw this._error; // TODO callstack
}
// async closeWrite() {
// if (this._error) throw this._error; // TODO callstack
// const socket_end = promisify(cb => this._socket.end(cb));
// await socket_end();
// }
close() {
this._socket.destroy(Error('socket destroyed'));
}
}