diff --git a/src/fluent/types.ts b/src/fluent/types.ts index 822b143..4ce62ea 100644 --- a/src/fluent/types.ts +++ b/src/fluent/types.ts @@ -7,6 +7,22 @@ import type { PartialDeep } from "type-fest"; import { GenericClass, GroupVersionKind } from "../types"; import { WatchCfg, Watcher } from "./watch"; +import https from "https"; +import { SecureClientSessionOptions } from "http2"; +/** + * Agent options for the the http2Watch + */ +export type AgentOptions = Pick< + SecureClientSessionOptions, + "ca" | "cert" | "key" | "rejectUnauthorized" +>; + +/** + * Options for the http2Watch + */ +export interface Options { + agent?: https.Agent & { options?: AgentOptions }; +} /** * The Phase matched when using the K8s Watch API. diff --git a/src/fluent/watch.ts b/src/fluent/watch.ts index a301738..7943a60 100644 --- a/src/fluent/watch.ts +++ b/src/fluent/watch.ts @@ -9,7 +9,7 @@ import http2 from "http2"; import fetch from "node-fetch"; import { fetch as wrappedFetch } from "../fetch"; import { GenericClass, KubernetesListObject } from "../types"; -import { Filters, WatchAction, WatchPhase } from "./types"; +import { Filters, WatchAction, WatchPhase, Options, AgentOptions } from "./types"; import { k8sCfg, pathBuilder } from "./utils"; import fs from "fs"; @@ -469,6 +469,44 @@ export class Watcher { } }; + // Configure the agent options for the HTTP/2 client + static #getAgentOptions(opts: Options) { + if (opts.agent && opts.agent instanceof https.Agent) { + return { + key: opts.agent.options.key, + cert: opts.agent.options.cert, + ca: opts.agent.options.ca, + rejectUnauthorized: false, + }; + } + return undefined; + } + + // Create an HTTP/2 client + static #createHttp2Client(origin: string, agentOptions?: AgentOptions) { + return http2.connect(origin, { + ca: agentOptions?.ca, + cert: agentOptions?.cert, + key: agentOptions?.key, + rejectUnauthorized: agentOptions?.rejectUnauthorized, + }); + } + + // Generate the request headers for the HTTP/2 request + #generateRequestHeaders = async (url: URL) => { + const token = await this.#getToken(); + const headers: Record = { + ":method": "GET", + ":path": url.pathname + url.search, + "content-type": "application/json", + "user-agent": "kubernetes-fluent-client", + }; + if (token) { + headers["Authorization"] = `Bearer ${token}`; + } + return headers; + }; + /** * Watch for changes to the resource. */ @@ -479,92 +517,31 @@ export class Watcher { // Build the URL and request options const { opts, url } = await this.#buildURL(true, this.#resourceVersion); - let agentOptions; - - if (opts.agent && opts.agent instanceof https.Agent) { - agentOptions = { - key: opts.agent.options.key, - cert: opts.agent.options.cert, - ca: opts.agent.options.ca, - rejectUnauthorized: false, - }; - } + const agentOptions = Watcher.#getAgentOptions(opts as Options); // HTTP/2 client connection setup - const client = http2.connect(url.origin, { - ca: agentOptions?.ca, - cert: agentOptions?.cert, - key: agentOptions?.key, - rejectUnauthorized: agentOptions?.rejectUnauthorized, + const client = Watcher.#createHttp2Client(url.origin, agentOptions); + + // Handle client connection errors + client.on("error", err => { + this.#events.emit(WatchEvent.NETWORK_ERROR, err); + this.#streamCleanup(client); + this.#scheduleReconnect(); }); // Set up headers for the HTTP/2 request - const token = await this.#getToken(); - const headers: Record = { - ":method": "GET", - ":path": url.pathname + url.search, - "content-type": "application/json", - "user-agent": "kubernetes-fluent-client", - }; - - if (token) { - headers["Authorization"] = `Bearer ${token}`; - } + const headers = await this.#generateRequestHeaders(url); // Make the HTTP/2 request const req = client.request(headers); - req.setEncoding("utf8"); - let buffer = ""; - - // Handle response data - req.on("response", headers => { - const statusCode = headers[":status"]; - - if (statusCode && statusCode >= 200 && statusCode < 300) { - this.#pendingReconnect = false; - this.#events.emit(WatchEvent.CONNECT, url.pathname); - - // Reset the retry count - this.#resyncFailureCount = 0; - this.#events.emit(WatchEvent.INC_RESYNC_FAILURE_COUNT, this.#resyncFailureCount); - - req.on("data", async chunk => { - try { - buffer += chunk; - const lines = buffer.split("\n"); - // Avoid Watch event data_error received. Unexpected end of JSON input. - buffer = lines.pop()!; - - for (const line of lines) { - await this.#processLine(line, this.#process); - } - } catch (err) { - void this.#errHandler(err); - } - }); - - req.on("end", () => { - client.close(); - this.#streamCleanup(); - }); - - req.on("close", () => { - client.close(); - this.#streamCleanup(); - }); - - req.on("error", err => { - void this.#errHandler(err); - }); - } else { - const statusMessage = headers[":status-text"] || "Unknown"; - throw new Error(`watch connect failed: ${statusCode} ${statusMessage}`); - } - }); - req.on("error", err => { - void this.#errHandler(err); + // Handler events for the HTTP/2 request + this.#handleHttp2Request(req, client); + + // Handle abort signal + this.#abortController.signal.addEventListener("abort", () => { + this.#streamCleanup(client); }); } catch (e) { void this.#errHandler(e); @@ -628,6 +605,7 @@ export class Watcher { clearInterval(this.$relistTimer); clearInterval(this.#resyncTimer); this.#streamCleanup(); + this.#scheduleReconnect(); this.#events.emit(WatchEvent.ABORT, err); return; @@ -645,15 +623,90 @@ export class Watcher { // Force a resync this.#lastSeenTime = OVERRIDE; }; + /** + * + * @param req - the request stream + * @param client - the client session + */ + #handleHttp2Request(req: http2.ClientHttp2Stream, client: http2.ClientHttp2Session) { + let buffer = ""; + + req.on("response", headers => { + const statusCode = headers[":status"]; + if (statusCode && statusCode >= 200 && statusCode < 300) { + this.#onWatchConnected(); + } else { + this.#cleanupAndReconnect(client, new Error(`watch connect failed: ${statusCode}`)); + } + }); - /** Cleanup the stream and listeners. */ - #streamCleanup = () => { + req.on("data", chunk => { + buffer += chunk; + const lines = buffer.split("\n"); + buffer = lines.pop() || ""; // Keep any incomplete line for the next chunk + + lines.forEach(line => { + void this.#processLine(line, this.#process); + }); + }); + + req.on("end", () => this.#cleanupAndReconnect(client)); + req.on("close", () => this.#cleanupAndReconnect(client)); + req.on("error", error => this.#errHandler(error)); + } + + /** Schedules a reconnect with a delay to prevent rapid reconnections. */ + #scheduleReconnect() { + const jitter = Math.floor(Math.random() * 1000); + const delay = (this.#watchCfg.resyncDelaySec ?? 5) * 1000 + jitter; + + setTimeout(() => { + this.#events.emit(WatchEvent.RECONNECT, this.#resyncFailureCount); + void this.#http2Watch(); + }, delay); + } + + /** + * Handle a successful connection to the watch. + */ + #onWatchConnected() { + this.#pendingReconnect = false; + this.#events.emit(WatchEvent.CONNECT); + + // Reset the retry count + this.#resyncFailureCount = 0; + this.#events.emit(WatchEvent.INC_RESYNC_FAILURE_COUNT, this.#resyncFailureCount); + } + + /** + * Cleanup the stream and listeners. + * + * @param client - the client session + */ + #streamCleanup = (client?: http2.ClientHttp2Session) => { if (this.#stream) { this.#stream.removeAllListeners(); this.#stream.destroy(); } - if (this.#useHTTP2) { - void this.#http2Watch(); + if (client) { + client.close(); } }; + + /** + * Cleanup the stream and listeners and reconnect. + * + * @param client - the client session + * @param error - the error that occurred + */ + #cleanupAndReconnect(client: http2.ClientHttp2Session, error?: Error) { + this.#streamCleanup(client); + + if (error) { + this.#events.emit(WatchEvent.NETWORK_ERROR, error); + void this.#errHandler(error); + } + + this.#scheduleReconnect(); + } }