Skip to content

Commit

Permalink
fix: reduce memory footprint and refactor (#425)
Browse files Browse the repository at this point in the history
## Description

Reduces the memory footprint of http2Watch and includes refactoring

Soak Test:
https://github.com/defenseunicorns/pepr/actions/runs/11355812600/job/31585909300

## Related Issue

Fixes #424 

<!-- or -->

Relates to #

## Type of change

- [ ] Bug fix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [x] Other (security config, docs update, etc)

## Checklist before merging

- [x] Test, docs, adr added or updated as needed
- [x] [Contributor Guide
Steps](https://docs.pepr.dev/main/contribute/#submitting-a-pull-request)
followed

Signed-off-by: Case Wylie <[email protected]>
  • Loading branch information
cmwylie19 authored Oct 17, 2024
1 parent 9394124 commit 2f0fb7b
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 81 deletions.
16 changes: 16 additions & 0 deletions src/fluent/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,22 @@ import type { PartialDeep } from "type-fest";

import { GenericClass, GroupVersionKind } from "../types";
import { WatchCfg, Watcher } from "./watch";
import https from "https";
import { SecureClientSessionOptions } from "http2";
/**
* Agent options for the the http2Watch
*/
export type AgentOptions = Pick<
SecureClientSessionOptions,
"ca" | "cert" | "key" | "rejectUnauthorized"
>;

/**
* Options for the http2Watch
*/
export interface Options {
agent?: https.Agent & { options?: AgentOptions };
}

/**
* The Phase matched when using the K8s Watch API.
Expand Down
215 changes: 134 additions & 81 deletions src/fluent/watch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import http2 from "http2";
import fetch from "node-fetch";
import { fetch as wrappedFetch } from "../fetch";
import { GenericClass, KubernetesListObject } from "../types";
import { Filters, WatchAction, WatchPhase } from "./types";
import { Filters, WatchAction, WatchPhase, Options, AgentOptions } from "./types";
import { k8sCfg, pathBuilder } from "./utils";
import fs from "fs";

Expand Down Expand Up @@ -469,6 +469,44 @@ export class Watcher<T extends GenericClass> {
}
};

// Configure the agent options for the HTTP/2 client
static #getAgentOptions(opts: Options) {
if (opts.agent && opts.agent instanceof https.Agent) {
return {
key: opts.agent.options.key,
cert: opts.agent.options.cert,
ca: opts.agent.options.ca,
rejectUnauthorized: false,
};
}
return undefined;
}

// Create an HTTP/2 client
static #createHttp2Client(origin: string, agentOptions?: AgentOptions) {
return http2.connect(origin, {
ca: agentOptions?.ca,
cert: agentOptions?.cert,
key: agentOptions?.key,
rejectUnauthorized: agentOptions?.rejectUnauthorized,
});
}

// Generate the request headers for the HTTP/2 request
#generateRequestHeaders = async (url: URL) => {
const token = await this.#getToken();
const headers: Record<string, string> = {
":method": "GET",
":path": url.pathname + url.search,
"content-type": "application/json",
"user-agent": "kubernetes-fluent-client",
};
if (token) {
headers["Authorization"] = `Bearer ${token}`;
}
return headers;
};

/**
* Watch for changes to the resource.
*/
Expand All @@ -479,92 +517,31 @@ export class Watcher<T extends GenericClass> {

// Build the URL and request options
const { opts, url } = await this.#buildURL(true, this.#resourceVersion);
let agentOptions;

if (opts.agent && opts.agent instanceof https.Agent) {
agentOptions = {
key: opts.agent.options.key,
cert: opts.agent.options.cert,
ca: opts.agent.options.ca,
rejectUnauthorized: false,
};
}
const agentOptions = Watcher.#getAgentOptions(opts as Options);

// HTTP/2 client connection setup
const client = http2.connect(url.origin, {
ca: agentOptions?.ca,
cert: agentOptions?.cert,
key: agentOptions?.key,
rejectUnauthorized: agentOptions?.rejectUnauthorized,
const client = Watcher.#createHttp2Client(url.origin, agentOptions);

// Handle client connection errors
client.on("error", err => {
this.#events.emit(WatchEvent.NETWORK_ERROR, err);
this.#streamCleanup(client);
this.#scheduleReconnect();
});

// Set up headers for the HTTP/2 request
const token = await this.#getToken();
const headers: Record<string, string> = {
":method": "GET",
":path": url.pathname + url.search,
"content-type": "application/json",
"user-agent": "kubernetes-fluent-client",
};

if (token) {
headers["Authorization"] = `Bearer ${token}`;
}
const headers = await this.#generateRequestHeaders(url);

// Make the HTTP/2 request
const req = client.request(headers);

req.setEncoding("utf8");

let buffer = "";

// Handle response data
req.on("response", headers => {
const statusCode = headers[":status"];

if (statusCode && statusCode >= 200 && statusCode < 300) {
this.#pendingReconnect = false;
this.#events.emit(WatchEvent.CONNECT, url.pathname);

// Reset the retry count
this.#resyncFailureCount = 0;
this.#events.emit(WatchEvent.INC_RESYNC_FAILURE_COUNT, this.#resyncFailureCount);

req.on("data", async chunk => {
try {
buffer += chunk;
const lines = buffer.split("\n");
// Avoid Watch event data_error received. Unexpected end of JSON input.
buffer = lines.pop()!;

for (const line of lines) {
await this.#processLine(line, this.#process);
}
} catch (err) {
void this.#errHandler(err);
}
});

req.on("end", () => {
client.close();
this.#streamCleanup();
});

req.on("close", () => {
client.close();
this.#streamCleanup();
});

req.on("error", err => {
void this.#errHandler(err);
});
} else {
const statusMessage = headers[":status-text"] || "Unknown";
throw new Error(`watch connect failed: ${statusCode} ${statusMessage}`);
}
});
req.on("error", err => {
void this.#errHandler(err);
// Handler events for the HTTP/2 request
this.#handleHttp2Request(req, client);

// Handle abort signal
this.#abortController.signal.addEventListener("abort", () => {
this.#streamCleanup(client);
});
} catch (e) {
void this.#errHandler(e);
Expand Down Expand Up @@ -628,6 +605,7 @@ export class Watcher<T extends GenericClass> {
clearInterval(this.$relistTimer);
clearInterval(this.#resyncTimer);
this.#streamCleanup();
this.#scheduleReconnect();
this.#events.emit(WatchEvent.ABORT, err);
return;

Expand All @@ -645,15 +623,90 @@ export class Watcher<T extends GenericClass> {
// Force a resync
this.#lastSeenTime = OVERRIDE;
};
/**
*
* @param req - the request stream
* @param client - the client session
*/
#handleHttp2Request(req: http2.ClientHttp2Stream, client: http2.ClientHttp2Session) {
let buffer = "";

req.on("response", headers => {
const statusCode = headers[":status"];
if (statusCode && statusCode >= 200 && statusCode < 300) {
this.#onWatchConnected();
} else {
this.#cleanupAndReconnect(client, new Error(`watch connect failed: ${statusCode}`));
}
});

/** Cleanup the stream and listeners. */
#streamCleanup = () => {
req.on("data", chunk => {
buffer += chunk;
const lines = buffer.split("\n");
buffer = lines.pop() || ""; // Keep any incomplete line for the next chunk

lines.forEach(line => {
void this.#processLine(line, this.#process);
});
});

req.on("end", () => this.#cleanupAndReconnect(client));
req.on("close", () => this.#cleanupAndReconnect(client));
req.on("error", error => this.#errHandler(error));
}

/** Schedules a reconnect with a delay to prevent rapid reconnections. */
#scheduleReconnect() {
const jitter = Math.floor(Math.random() * 1000);
const delay = (this.#watchCfg.resyncDelaySec ?? 5) * 1000 + jitter;

setTimeout(() => {
this.#events.emit(WatchEvent.RECONNECT, this.#resyncFailureCount);
void this.#http2Watch();
}, delay);
}

/**
* Handle a successful connection to the watch.
*/
#onWatchConnected() {
this.#pendingReconnect = false;
this.#events.emit(WatchEvent.CONNECT);

// Reset the retry count
this.#resyncFailureCount = 0;
this.#events.emit(WatchEvent.INC_RESYNC_FAILURE_COUNT, this.#resyncFailureCount);
}

/**
* Cleanup the stream and listeners.
*
* @param client - the client session
*/
#streamCleanup = (client?: http2.ClientHttp2Session) => {
if (this.#stream) {
this.#stream.removeAllListeners();
this.#stream.destroy();
}
if (this.#useHTTP2) {
void this.#http2Watch();
if (client) {
client.close();
}
};

/**
* Cleanup the stream and listeners and reconnect.
*
* @param client - the client session
* @param error - the error that occurred
*/
#cleanupAndReconnect(client: http2.ClientHttp2Session, error?: Error) {
this.#streamCleanup(client);

if (error) {
this.#events.emit(WatchEvent.NETWORK_ERROR, error);
void this.#errHandler(error);
}

this.#scheduleReconnect();
}
}

0 comments on commit 2f0fb7b

Please sign in to comment.