diff --git a/packages/portalnetwork/src/wire/utp/Packets/PacketHeader.ts b/packages/portalnetwork/src/wire/utp/Packets/PacketHeader.ts index 1fa25f1eb..12ba81e8f 100644 --- a/packages/portalnetwork/src/wire/utp/Packets/PacketHeader.ts +++ b/packages/portalnetwork/src/wire/utp/Packets/PacketHeader.ts @@ -2,7 +2,7 @@ import { VERSION } from '../Utils/constants.js' import { SelectiveAckHeaderExtension } from './Extensions.js' -import type { Uint8, Uint16, Uint32 } from '../index.js' +import type { Uint16, Uint32, Uint8 } from '../index.js' import type { HeaderInput, ISelectiveAckHeaderInput, diff --git a/packages/portalnetwork/src/wire/utp/Socket/ContentReader.ts b/packages/portalnetwork/src/wire/utp/Socket/ContentReader.ts index c8c8ba073..d07ce25dc 100644 --- a/packages/portalnetwork/src/wire/utp/Socket/ContentReader.ts +++ b/packages/portalnetwork/src/wire/utp/Socket/ContentReader.ts @@ -64,7 +64,8 @@ export class ContentReader { } readPacket(payload: Uint8Array) { - this.nextDataNr!++ + // Wrap seqNr back to 0 when it exceeds 16-bit max integer + this.nextDataNr! = (this.nextDataNr! + 1) % 65536 this.bytes.push(...payload) this.logger.extend('BYTES')( `Current stream: ${this.bytes.length} / ${this.bytesExpected} bytes. ${this.bytesExpected - this.bytes.length} bytes till end of content.`, diff --git a/packages/portalnetwork/src/wire/utp/Socket/ContentWriter.ts b/packages/portalnetwork/src/wire/utp/Socket/ContentWriter.ts index 9ff0f6b0d..f4663b438 100644 --- a/packages/portalnetwork/src/wire/utp/Socket/ContentWriter.ts +++ b/packages/portalnetwork/src/wire/utp/Socket/ContentWriter.ts @@ -10,8 +10,8 @@ export class ContentWriter { content: Uint8Array writing: boolean sentChunks: number[] - dataChunks: Record - dataNrs: number[] + dataChunks: Array<[number, Uint8Array]> + constructor(socket: WriteSocket, content: Uint8Array, startingSeqNr: number, logger: Debugger) { this.socket = socket this.content = content @@ -19,22 +19,22 @@ export class ContentWriter { this.seqNr = startingSeqNr this.writing = false this.sentChunks = [] - this.dataNrs = [] this.logger = logger.extend('WRITING') - this.dataChunks = {} + this.dataChunks = [] } async write(): Promise { if (!this.writing) return - const chunks = Object.keys(this.dataChunks).length + const totalChunks = this.dataChunks.length let bytes: Uint8Array - if (this.sentChunks.length < chunks) { - bytes = this.dataChunks[this.seqNr] ?? [] - !this.sentChunks.includes(this.seqNr) && this.sentChunks.push(this.seqNr) + if (this.sentChunks.length < totalChunks) { + bytes = this.dataChunks[this.sentChunks.length][1] ?? [] + this.sentChunks.push(this.seqNr) this.logger( - `Sending ST-DATA ${this.seqNr - this.startingSeqNr + 1}/${chunks} -- SeqNr: ${this.seqNr}`, + `Sending ST-DATA ${this.sentChunks.length}/${totalChunks} -- SeqNr: ${this.seqNr}`, ) - this.seqNr = this.sentChunks.slice(-1)[0] + 1 + // Wrap seqNr back to 0 when it exceeds 16-bit max integer + this.seqNr = (this.sentChunks[this.sentChunks.length - 1] + 1) % 65536 await this.socket.sendDataPacket(bytes) return } @@ -48,17 +48,18 @@ export class ContentWriter { await this.write() } - chunk(): Record { + chunk(): [number, Uint8Array][] { let arrayMod = this.content const total = Math.ceil(this.content.length / BUFFER_SIZE) this.logger(`Preparing content for transfer as ${total} ${BUFFER_SIZE} byte chunks.`) - const dataChunks: Record = {} + const dataChunks = new Array<[number, Uint8Array]>(total) + let seqNr = this.startingSeqNr for (let i = 0; i < total; i++) { const start = 0 const end = arrayMod.length > 512 ? 512 : undefined - dataChunks[i + this.startingSeqNr] = arrayMod.subarray(start, end) + dataChunks[i] = [seqNr, arrayMod.subarray(start, end)] arrayMod = arrayMod.subarray(end) - this.dataNrs.push(i + this.startingSeqNr) + seqNr = (seqNr + 1) % 65536 // Wrap seqNr back to 0 when it exceeds 16-bit max integer } this.logger(`Ready to send ${total} Packets starting at SeqNr: ${this.startingSeqNr}`) return dataChunks diff --git a/packages/portalnetwork/src/wire/utp/Socket/UtpSocket.ts b/packages/portalnetwork/src/wire/utp/Socket/UtpSocket.ts index 9a1fb3810..be81deaa5 100644 --- a/packages/portalnetwork/src/wire/utp/Socket/UtpSocket.ts +++ b/packages/portalnetwork/src/wire/utp/Socket/UtpSocket.ts @@ -106,6 +106,9 @@ export abstract class UtpSocket { extension, } opts.pType === PacketType.ST_DATA && this.seqNr++ + if (this.seqNr > 65535) { + this.seqNr = 0 + } return this.packetManager.createPacket(params) } diff --git a/packages/portalnetwork/src/wire/utp/Socket/WriteSocket.ts b/packages/portalnetwork/src/wire/utp/Socket/WriteSocket.ts index d100e0031..023754b6a 100644 --- a/packages/portalnetwork/src/wire/utp/Socket/WriteSocket.ts +++ b/packages/portalnetwork/src/wire/utp/Socket/WriteSocket.ts @@ -60,7 +60,7 @@ export class WriteSocket extends UtpSocket { this.close() } compare(): boolean { - if (!this.ackNrs.includes(undefined) && this.ackNrs.length === this.writer!.dataNrs.length) { + if (!this.ackNrs.includes(undefined) && this.ackNrs.length === this.writer!.dataChunks.length) { return true } return false @@ -71,7 +71,7 @@ export class WriteSocket extends UtpSocket { this._clearTimeout() } logProgress() { - const needed = this.writer!.dataNrs.filter((n) => !this.ackNrs.includes(n)) + const needed = this.writer!.dataChunks.filter((n) => !this.ackNrs.includes(n[0])) this.logger( `AckNr's received (${this.ackNrs.length}/${ this.writer!.sentChunks.length @@ -89,9 +89,7 @@ export class WriteSocket extends UtpSocket { `) } updateAckNrs(ackNr: number) { - this.ackNrs = Object.keys(this.writer!.dataChunks) - .filter((n) => parseInt(n) <= ackNr) - .map((n) => parseInt(n)) + this.ackNrs = this.writer!.dataChunks.filter((n) => n[0] <= ackNr).map((n) => n[0]) } async sendDataPacket(bytes: Uint8Array): Promise { this.state = ConnectionState.Connected diff --git a/packages/portalnetwork/test/wire/utp/socket.spec.ts b/packages/portalnetwork/test/wire/utp/socket.spec.ts index 52a6111d5..c884e0a89 100644 --- a/packages/portalnetwork/test/wire/utp/socket.spec.ts +++ b/packages/portalnetwork/test/wire/utp/socket.spec.ts @@ -32,21 +32,21 @@ const _read = async (networkId: NetworkId): Promise => { networkId, ackNr: DEFAULT_RAND_ACKNR, seqNr: DEFAULT_RAND_SEQNR, - remoteAddress: '1234', + enr: 'enr:1234' as any, rcvId: readId, sndId: writeId, logger: debug('test'), type: UtpSocketType.READ, }) as ReadSocket } -const _write = async (networkId: NetworkId): Promise => { +const _write = async (networkId: NetworkId, seqNr?: number): Promise => { const client = await PortalNetwork.create({ bindAddress: '127.0.0.1' }) return createUtpSocket({ utp: new PortalNetworkUTP(client), networkId, ackNr: DEFAULT_RAND_ACKNR, - seqNr: DEFAULT_RAND_SEQNR, - remoteAddress: '1234', + seqNr: seqNr ?? DEFAULT_RAND_SEQNR, + enr: 'enr:1234' as any, rcvId: writeId, sndId: readId, logger: debug('test'), @@ -327,9 +327,24 @@ describe('uTP Socket Tests', async () => { s.setWriter(s.getSeqNr()) it('socket.compare()', () => { s.ackNrs = [0, 1, 2, 3, 4, 5] - s.writer!.dataNrs = [0, 1, 2, 3, 4, 5] + s.writer!.dataChunks = [ + [0, Uint8Array.from([111])], + [1, Uint8Array.from([222])], + [2, Uint8Array.from([333])], + [3, Uint8Array.from([444])], + [4, Uint8Array.from([555])], + [5, Uint8Array.from([666])], + ] assert.ok(s.compare(), 'socket.compare() returns true for matching ackNrs and dataNrs') - s.writer!.dataNrs = [0, 1, 2, 3, 4, 5, 6] + s.writer!.dataChunks = [ + [0, Uint8Array.from([111])], + [1, Uint8Array.from([222])], + [2, Uint8Array.from([333])], + [3, Uint8Array.from([444])], + [4, Uint8Array.from([555])], + [5, Uint8Array.from([666])], + [6, Uint8Array.from([777])], + ] assert.notOk(s.compare(), 'socket.compare() returns false for mismatched ackNrs and dataNrs') s.ackNrs = [0, 1, 2, 3, 4, 6, 5] assert.ok( @@ -378,3 +393,14 @@ describe('uTP Socket Tests', async () => { ) }) }) +describe('seqNr overflow', () => { + it('should reset seqNr to 0', async () => { + const s = await _write(NetworkId.HistoryNetwork, 65535) + s.logger = debug('test') + s.content = new Uint8Array(1024) + s.setWriter(s.getSeqNr()) + await s.writer?.write() + await s.writer?.write() + assert.equal(s.getSeqNr(), 1, 'seqNr should be reset to 0') + }) +}) diff --git a/packages/portalnetwork/test/wire/utp/utp.spec.ts b/packages/portalnetwork/test/wire/utp/utp.spec.ts index 651f85567..8a223e834 100644 --- a/packages/portalnetwork/test/wire/utp/utp.spec.ts +++ b/packages/portalnetwork/test/wire/utp/utp.spec.ts @@ -44,9 +44,9 @@ describe('uTP Reader/Writer tests', async () => { Math.ceil(sampleSize / 512), 'ContentWriter chunked', ) - const totalLength = Object.values(contentChunks).reduce((acc, chunk) => acc + chunk.length, 0) + const totalLength = contentChunks.reduce((acc, chunk) => acc + chunk[1].length, 0) assert.equal(totalLength, sampleSize, 'ContentWriter chunked all bytes') - const packets = Object.values(contentChunks).map((chunk, i) => { + const packets = contentChunks.map((chunk, i) => { return Packet.fromOpts({ header: { seqNr: i, @@ -59,10 +59,10 @@ describe('uTP Reader/Writer tests', async () => { timestampMicroseconds: 0, wndSize: 0, }, - payload: chunk, + payload: chunk[1], }) }) - assert.equal(packets.length, Object.values(contentChunks).length, 'Packets created') + assert.equal(packets.length, contentChunks.length, 'Packets created') let sent = 0 for (const [i, packet] of packets.entries()) { reader.addPacket(packet) @@ -87,9 +87,9 @@ describe('uTP Reader/Writer tests', async () => { Math.ceil(content.length / 512), 'ContentWriter chunked', ) - const totalLength = Object.values(contentChunks).reduce((acc, chunk) => acc + chunk.length, 0) + const totalLength = contentChunks.reduce((acc, chunk) => acc + chunk[1].length, 0) assert.equal(totalLength, content.length, 'ContentWriter chunked all bytes') - const packets = Object.values(contentChunks).map((chunk, i) => { + const packets = contentChunks.map((chunk, i) => { return Packet.fromOpts({ header: { seqNr: i, @@ -102,10 +102,10 @@ describe('uTP Reader/Writer tests', async () => { timestampMicroseconds: 0, wndSize: 0, }, - payload: chunk, + payload: chunk[1], }) }) - assert.equal(packets.length, Object.values(contentChunks).length, 'Packets created') + assert.equal(packets.length, contentChunks.length, 'Packets created') let sent = 0 for (const [i, packet] of packets.entries()) { reader.addPacket(packet)