Skip to content

Commit

Permalink
fix: remove legacy fetch (#465)
Browse files Browse the repository at this point in the history
## Description

Removes byline and legacy fetch from KFC.

## Related Issue

Fixes #457 

<!-- or -->

Relates to #

## Type of change

- [ ] Bug fix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [x] Other (security config, docs update, etc)

## Checklist before merging

- [x] Test, docs, adr added or updated as needed
- [x] [Contributor Guide
Steps](https://docs.pepr.dev/main/contribute/#submitting-a-pull-request)
followed

Signed-off-by: Case Wylie <[email protected]>
  • Loading branch information
cmwylie19 authored Nov 14, 2024
1 parent 377b977 commit 6914e99
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 99 deletions.
9 changes: 0 additions & 9 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
"homepage": "https://github.com/defenseunicorns/kubernetes-fluent-client#readme",
"dependencies": {
"@kubernetes/client-node": "1.0.0-rc7",
"byline": "5.0.0",
"fast-json-patch": "3.1.1",
"http-status-codes": "2.3.0",
"node-fetch": "2.7.0",
Expand Down
99 changes: 10 additions & 89 deletions src/fluent/watch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@

import { createHash } from "crypto";
import { EventEmitter } from "events";
import byline from "byline";
import https from "https";
import legacyFetch, { RequestInit } from "node-fetch";
import { RequestInit } from "node-fetch";
import { Agent, fetch } from "undici";
import { fetch as wrappedFetch } from "../fetch";
import { GenericClass, KubernetesListObject } from "../types";
Expand Down Expand Up @@ -55,8 +54,6 @@ export type WatchCfg = {
relistIntervalSec?: number;
/** Max amount of seconds to go without receiving an event before reconciliation starts. Defaults to 300 (5 minutes). */
lastSeenLimitSeconds?: number;
/** Watch Mechansism */
useLegacyWatch?: boolean;
};

const NONE = 50;
Expand All @@ -70,7 +67,7 @@ export class Watcher<T extends GenericClass> {
#callback: WatchAction<T>;
#watchCfg: WatchCfg;
#latestRelistWindow: string = "";
#useLegacyWatch = false;

// Track the last time data was received
#lastSeenTime = NONE;
#lastSeenLimit: number;
Expand All @@ -83,7 +80,6 @@ export class Watcher<T extends GenericClass> {

// Create a stream to read the response body
#stream?: Readable;
#legacyStream?: byline.LineStream;

// Create an EventEmitter to emit events
#events = new EventEmitter();
Expand Down Expand Up @@ -127,9 +123,6 @@ export class Watcher<T extends GenericClass> {
// Set the resync interval to 10 minutes if not specified
watchCfg.lastSeenLimitSeconds ??= 600;

// Set the watch mechanism
this.#useLegacyWatch = watchCfg.useLegacyWatch || false;

// Set the last seen limit to the resync interval
this.#lastSeenLimit = watchCfg.lastSeenLimitSeconds * 1000;

Expand Down Expand Up @@ -169,23 +162,17 @@ export class Watcher<T extends GenericClass> {
*/
public async start(): Promise<AbortController> {
this.#events.emit(WatchEvent.INIT_CACHE_MISS, this.#latestRelistWindow);
if (this.#useLegacyWatch) {
await this.#legacyWatch();
} else {
await this.#watch();
}
await this.#watch();

return this.#abortController;
}

/** Close the watch. Also available on the AbortController returned by {@link Watcher.start}. */
public close() {
clearInterval(this.$relistTimer);
clearInterval(this.#resyncTimer);
if (this.#useLegacyWatch) {
this.#legacyStreamCleanup();
} else {
this.#streamCleanup();
}

this.#streamCleanup();
this.#abortController.abort();
}

Expand Down Expand Up @@ -417,58 +404,6 @@ export class Watcher<T extends GenericClass> {
this.#events.emit(WatchEvent.DATA_ERROR, err);
}
};
/** node-fetch watch */
#legacyWatch = async () => {
try {
// Start with a list operation
await this.#list();

// Build the URL and request options
const { opts, url } = await this.#buildURL(true, this.#resourceVersion);

// Create a stream to read the response body
this.#legacyStream = byline.createStream();

// Bind the stream events
this.#legacyStream.on("error", this.#errHandler);
this.#legacyStream.on("close", this.#legacyStreamCleanup);
this.#legacyStream.on("finish", this.#legacyStreamCleanup);

// Make the actual request
const response = await legacyFetch(url, { ...opts });

// If the request is successful, start listening for events
if (response.ok) {
// Reset the pending reconnect flag
this.#pendingReconnect = false;

this.#events.emit(WatchEvent.CONNECT, url.pathname);

const { body } = response;

// Reset the retry count
this.#resyncFailureCount = 0;
this.#events.emit(WatchEvent.INC_RESYNC_FAILURE_COUNT, this.#resyncFailureCount);

// Listen for events and call the callback function
this.#legacyStream.on("data", async line => {
await this.#processLine(line, this.#process);
});

// Bind the body events
body.on("error", this.#errHandler);
body.on("close", this.#legacyStreamCleanup);
body.on("finish", this.#legacyStreamCleanup);

// Pipe the response body to the stream
body.pipe(this.#legacyStream);
} else {
throw new Error(`watch connect failed: ${response.status} ${response.statusText}`);
}
} catch (e) {
void this.#errHandler(e);
}
};

static getHTTPSAgent = (opts: RequestInit) => {
// In cluster there will be agent - testing or dev no
Expand Down Expand Up @@ -595,12 +530,7 @@ export class Watcher<T extends GenericClass> {
} else {
this.#pendingReconnect = true;
this.#events.emit(WatchEvent.RECONNECT, this.#resyncFailureCount);
if (this.#useLegacyWatch) {
this.#legacyStreamCleanup();
void this.#legacyWatch();
} else {
this.#cleanupAndReconnect();
}
this.#cleanupAndReconnect();
}
} else {
// Otherwise, call the finally function if it exists
Expand All @@ -623,11 +553,9 @@ export class Watcher<T extends GenericClass> {
case "AbortError":
clearInterval(this.$relistTimer);
clearInterval(this.#resyncTimer);
if (this.#useLegacyWatch) {
this.#legacyStreamCleanup();
} else {
this.#streamCleanup();
}

this.#streamCleanup();

this.#events.emit(WatchEvent.ABORT, err);
return;

Expand Down Expand Up @@ -659,11 +587,4 @@ export class Watcher<T extends GenericClass> {
this.#stream.destroy();
}
};

#legacyStreamCleanup = () => {
if (this.#legacyStream) {
this.#legacyStream.removeAllListeners();
this.#legacyStream.destroy();
}
};
}

0 comments on commit 6914e99

Please sign in to comment.