diff --git a/package-lock.json b/package-lock.json index a174b0b..a739236 100644 --- a/package-lock.json +++ b/package-lock.json @@ -10,7 +10,6 @@ "license": "Apache-2.0", "dependencies": { "@kubernetes/client-node": "1.0.0-rc7", - "byline": "5.0.0", "fast-json-patch": "3.1.1", "http-status-codes": "2.3.0", "node-fetch": "2.7.0", @@ -3318,14 +3317,6 @@ "integrity": "sha512-E+XQCRwSbaaiChtv6k6Dwgc+bx+Bs6vuKJHHl5kox/BaKbhiXzqQOwK4cO22yElGp2OCmjwVhT3HmxgyPGnJfQ==", "dev": true }, - "node_modules/byline": { - "version": "5.0.0", - "resolved": "https://registry.npmjs.org/byline/-/byline-5.0.0.tgz", - "integrity": "sha512-s6webAy+R4SR8XVuJWt2V2rGvhnrhxN+9S15GNuTK3wKPOXFF6RNc+8ug2XhH+2s4f+uudG4kUVYmYOQWL2g0Q==", - "engines": { - "node": ">=0.10.0" - } - }, "node_modules/callsites": { "version": "3.1.0", "resolved": "https://registry.npmjs.org/callsites/-/callsites-3.1.0.tgz", diff --git a/package.json b/package.json index ef9b923..e5fb74b 100644 --- a/package.json +++ b/package.json @@ -38,7 +38,6 @@ "homepage": "https://github.com/defenseunicorns/kubernetes-fluent-client#readme", "dependencies": { "@kubernetes/client-node": "1.0.0-rc7", - "byline": "5.0.0", "fast-json-patch": "3.1.1", "http-status-codes": "2.3.0", "node-fetch": "2.7.0", diff --git a/src/fluent/watch.ts b/src/fluent/watch.ts index 6f919fb..96ae0af 100644 --- a/src/fluent/watch.ts +++ b/src/fluent/watch.ts @@ -3,9 +3,8 @@ import { createHash } from "crypto"; import { EventEmitter } from "events"; -import byline from "byline"; import https from "https"; -import legacyFetch, { RequestInit } from "node-fetch"; +import { RequestInit } from "node-fetch"; import { Agent, fetch } from "undici"; import { fetch as wrappedFetch } from "../fetch"; import { GenericClass, KubernetesListObject } from "../types"; @@ -55,8 +54,6 @@ export type WatchCfg = { relistIntervalSec?: number; /** Max amount of seconds to go without receiving an event before reconciliation starts. Defaults to 300 (5 minutes). */ lastSeenLimitSeconds?: number; - /** Watch Mechansism */ - useLegacyWatch?: boolean; }; const NONE = 50; @@ -70,7 +67,7 @@ export class Watcher { #callback: WatchAction; #watchCfg: WatchCfg; #latestRelistWindow: string = ""; - #useLegacyWatch = false; + // Track the last time data was received #lastSeenTime = NONE; #lastSeenLimit: number; @@ -83,7 +80,6 @@ export class Watcher { // Create a stream to read the response body #stream?: Readable; - #legacyStream?: byline.LineStream; // Create an EventEmitter to emit events #events = new EventEmitter(); @@ -127,9 +123,6 @@ export class Watcher { // Set the resync interval to 10 minutes if not specified watchCfg.lastSeenLimitSeconds ??= 600; - // Set the watch mechanism - this.#useLegacyWatch = watchCfg.useLegacyWatch || false; - // Set the last seen limit to the resync interval this.#lastSeenLimit = watchCfg.lastSeenLimitSeconds * 1000; @@ -169,11 +162,8 @@ export class Watcher { */ public async start(): Promise { this.#events.emit(WatchEvent.INIT_CACHE_MISS, this.#latestRelistWindow); - if (this.#useLegacyWatch) { - await this.#legacyWatch(); - } else { - await this.#watch(); - } + await this.#watch(); + return this.#abortController; } @@ -181,11 +171,8 @@ export class Watcher { public close() { clearInterval(this.$relistTimer); clearInterval(this.#resyncTimer); - if (this.#useLegacyWatch) { - this.#legacyStreamCleanup(); - } else { - this.#streamCleanup(); - } + + this.#streamCleanup(); this.#abortController.abort(); } @@ -417,58 +404,6 @@ export class Watcher { this.#events.emit(WatchEvent.DATA_ERROR, err); } }; - /** node-fetch watch */ - #legacyWatch = async () => { - try { - // Start with a list operation - await this.#list(); - - // Build the URL and request options - const { opts, url } = await this.#buildURL(true, this.#resourceVersion); - - // Create a stream to read the response body - this.#legacyStream = byline.createStream(); - - // Bind the stream events - this.#legacyStream.on("error", this.#errHandler); - this.#legacyStream.on("close", this.#legacyStreamCleanup); - this.#legacyStream.on("finish", this.#legacyStreamCleanup); - - // Make the actual request - const response = await legacyFetch(url, { ...opts }); - - // If the request is successful, start listening for events - if (response.ok) { - // Reset the pending reconnect flag - this.#pendingReconnect = false; - - this.#events.emit(WatchEvent.CONNECT, url.pathname); - - const { body } = response; - - // Reset the retry count - this.#resyncFailureCount = 0; - this.#events.emit(WatchEvent.INC_RESYNC_FAILURE_COUNT, this.#resyncFailureCount); - - // Listen for events and call the callback function - this.#legacyStream.on("data", async line => { - await this.#processLine(line, this.#process); - }); - - // Bind the body events - body.on("error", this.#errHandler); - body.on("close", this.#legacyStreamCleanup); - body.on("finish", this.#legacyStreamCleanup); - - // Pipe the response body to the stream - body.pipe(this.#legacyStream); - } else { - throw new Error(`watch connect failed: ${response.status} ${response.statusText}`); - } - } catch (e) { - void this.#errHandler(e); - } - }; static getHTTPSAgent = (opts: RequestInit) => { // In cluster there will be agent - testing or dev no @@ -595,12 +530,7 @@ export class Watcher { } else { this.#pendingReconnect = true; this.#events.emit(WatchEvent.RECONNECT, this.#resyncFailureCount); - if (this.#useLegacyWatch) { - this.#legacyStreamCleanup(); - void this.#legacyWatch(); - } else { - this.#cleanupAndReconnect(); - } + this.#cleanupAndReconnect(); } } else { // Otherwise, call the finally function if it exists @@ -623,11 +553,9 @@ export class Watcher { case "AbortError": clearInterval(this.$relistTimer); clearInterval(this.#resyncTimer); - if (this.#useLegacyWatch) { - this.#legacyStreamCleanup(); - } else { - this.#streamCleanup(); - } + + this.#streamCleanup(); + this.#events.emit(WatchEvent.ABORT, err); return; @@ -659,11 +587,4 @@ export class Watcher { this.#stream.destroy(); } }; - - #legacyStreamCleanup = () => { - if (this.#legacyStream) { - this.#legacyStream.removeAllListeners(); - this.#legacyStream.destroy(); - } - }; }