Skip to content

Commit

Permalink
feat: add smtp headers manipulation
Browse files Browse the repository at this point in the history
  • Loading branch information
loopingz committed Nov 1, 2024
1 parent 3b8ff99 commit 8a8eea0
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 0 deletions.
57 changes: 57 additions & 0 deletions src/headers_transformer.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { suite, test } from "@testdeck/mocha";
import * as assert from "assert";
import { pipeline } from "node:stream/promises";
import { Writable } from "node:stream";
import { createReadStream } from "node:fs";
import { HeadersTransform } from "./headers_transformer";

class BufferWritable extends Writable {
buffer: Buffer;

constructor(options?) {
super(options);
this.buffer = Buffer.alloc(0); // Initialize an empty buffer
}

_write(chunk, encoding, callback) {
// Concatenate the incoming chunk to the buffer
this.buffer = Buffer.concat([this.buffer, chunk]);
callback();
}

toString() {
return this.buffer.toString();
}
}

@suite
class HeadersTransformerTest {
TEST_FILE = import.meta.dirname + "/../tests/data-headers.txt";

@test
async normal() {
await this.test(createReadStream(this.TEST_FILE));
}
@test
async chunked() {
await this.test(createReadStream(this.TEST_FILE, { highWaterMark: 4 }));
}

async test(readable: NodeJS.ReadableStream) {
const writable = new BufferWritable();
await pipeline(
readable,
new HeadersTransform({ "-X-Test": "", "X-Received": "Add", "?X-Upsert-1": "Value", "?X-Upsert-2": "Value2" }),
writable
);
const wr = writable.toString();

assert.ok(!wr.includes("X-Test: plop"), "Should remove X-Test");
assert.ok(wr.includes("X-Received: whatever"), "Should keep X-Received");
assert.ok(wr.includes("X-Received: Add"), "Should add X-Received");
assert.ok(wr.includes("X-Upsert-2: Value2"), "Should upsert X-Upsert-2");
assert.ok(!wr.includes("X-Upsert-1: Value"), "Should not upsert X-Upsert-1");
assert.ok(wr.includes("X-Upsert-1: test"), "Should keep X-Upsert-1");
assert.ok(wr.includes("X-Test: still here"), "Should keep X-Test");
}
}
115 changes: 115 additions & 0 deletions src/headers_transformer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import { Transform, TransformOptions } from "node:stream";

/**
* Configuration for mail headers
*
* If key is prefixed with a - then the header is removed
* If key is prefixed with a ? then the header is added only if not present
*/
export type HeadersTransformConfig = {
[key: string]: string;
};

/**
* Inject or remove headers from a DATA stream
*/
export class HeadersTransform extends Transform {
previousChunk = "";
headersDone = false;
headersToRemove: string[] = [];
headersToUpsert: Set<string>;

constructor(
protected headers: HeadersTransformConfig,
options?: TransformOptions
) {
super(options);
this.headersToRemove = Object.keys(headers)
.filter(key => key.startsWith("-"))
.map(key => key.substring(1));
this.headersToUpsert = new Set(
Object.keys(headers)
.filter(key => key.startsWith("?"))
.map(key => key.substring(1))
);
}

/**
* Check if the header should be included and also remember headers seen
*
* @param line
* @returns
*/
includeHeader(line: string): boolean {
const header = line.split(":")[0];
// Found header no need to add
this.headersToUpsert.delete(header);
if (this.headersToRemove.includes(header)) {
return false;
}
return true;
}

/**
* Add headers to the stream
*/
addHeaders() {
for (let key in this.headers) {
if (key.startsWith("-")) {
continue;
}
if (key.startsWith("?")) {
if (this.headersToUpsert.has(key.substring(1))) {
this.push(`${key.substring(1)}: ${this.headers[key]}\n`);
}
continue;
}
this.push(`${key}: ${this.headers[key]}\n`);
}
}

/**
* Override
* @param chunk
* @param encoding
* @param callback
* @returns
*/
_transform(chunk, encoding, callback) {
// Do not filter headers if headers are already done
if (this.headersDone) {
this.push(chunk);
callback();
return;
}
const currentChunk = this.previousChunk + chunk.toString();
const lines = currentChunk.split("\n");
// Check if the headers are done
this.headersDone = currentChunk.includes("\n\n");
// We found an empty line, headers are done
if (this.headersDone) {
const headersEnd = lines.findIndex(line => line === "");
// We filter the remaining headers
this.push(
lines
.slice(0, headersEnd)
.filter(line => this.includeHeader(line))
.join("\n") + "\n"
);
this.addHeaders();
this.push(lines.slice(headersEnd).join("\n"));
callback();
return;
}

// Check if the last line is complete
if (currentChunk.endsWith("\n")) {
this.previousChunk = "";
} else {
this.previousChunk = lines.pop();
}
const filteredLines = lines.filter(line => this.includeHeader(line));
this.push(filteredLines.join("\n"));
callback();
}
}
11 changes: 11 additions & 0 deletions tests/data-headers.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
X-Test: plop
X-1: 1
X-1-Header: Previous header is there to test chunk break on new line
X-Upsert-1: test
X-Received: whatever

My data
Should not be filtered
X-Test: still here

.

0 comments on commit 8a8eea0

Please sign in to comment.