Skip to content

Commit

Permalink
reset core function from main
Browse files Browse the repository at this point in the history
  • Loading branch information
benjaminpaige committed Jan 3, 2025
1 parent 6671035 commit b535756
Show file tree
Hide file tree
Showing 8 changed files with 90 additions and 76 deletions.
4 changes: 2 additions & 2 deletions lib/lambda/getSubTypes.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { handleOpensearchError } from "./utils";
import { APIGatewayEvent } from "aws-lambda";
import { response } from "../libs/handler-lib";
import * as os from "../libs/opensearch-lib";
import { response } from "libs/handler-lib";
import * as os from "libs/opensearch-lib";

type GetSubTypesBody = {
authorityId: string;
Expand Down
2 changes: 1 addition & 1 deletion lib/lambda/getUploadUrl.test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { describe, it, expect, vi, beforeEach, Mock } from "vitest";
import { APIGatewayEvent } from "aws-lambda";
import { handler } from "./getUploadUrl";
import { response } from "../libs/handler-lib";
import { response } from "libs/handler-lib";
import { getSignedUrl } from "@aws-sdk/s3-request-presigner";
import { v4 as uuidv4 } from "uuid";

Expand Down
4 changes: 2 additions & 2 deletions lib/lambda/itemExists.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { handleOpensearchError } from "./utils";
import { APIGatewayEvent } from "aws-lambda";
import { itemExists } from "../libs/api/package";
import { response } from "../libs/handler-lib";
import { itemExists } from "libs/api/package";
import { response } from "libs/handler-lib";

export const handler = async (event: APIGatewayEvent) => {
if (!event.body) {
Expand Down
41 changes: 32 additions & 9 deletions lib/lambda/runReindex.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { describe, it, expect, vi, beforeEach } from "vitest";
import { send, SUCCESS, FAILED } from "cfn-response-async";
import { SFNClient, StartExecutionCommand } from "@aws-sdk/client-sfn";
import { handler } from "./runReindex";
import { Context } from "aws-lambda";

vi.mock("cfn-response-async", () => ({
send: vi.fn(),
Expand Down Expand Up @@ -44,7 +43,7 @@ describe("CloudFormation Custom Resource Handler", () => {
send: sendMock,
}));

await handler(mockEvent, {} as Context, () => {});
await handler(mockEvent, mockContext);

expect(SFNClient).toHaveBeenCalled();
expect(StartExecutionCommand).toHaveBeenCalledWith({
Expand All @@ -55,7 +54,13 @@ describe("CloudFormation Custom Resource Handler", () => {
}),
});
expect(sendMock).toHaveBeenCalled();
expect(send).not.toHaveBeenCalledWith(mockEvent, mockContext, SUCCESS, {}, "static");
expect(send).not.toHaveBeenCalledWith(
mockEvent,
mockContext,
SUCCESS,
{},
"static",
);
});

it("should send a SUCCESS response on Update request type", async () => {
Expand All @@ -64,9 +69,15 @@ describe("CloudFormation Custom Resource Handler", () => {
RequestType: "Update",
};

await handler(mockEvent, {} as Context, () => {});
await handler(mockEvent, mockContext);

expect(send).toHaveBeenCalledWith(mockEvent, mockContext, SUCCESS, {}, "static");
expect(send).toHaveBeenCalledWith(
mockEvent,
mockContext,
SUCCESS,
{},
"static",
);
});

it("should send a SUCCESS response on Delete request type", async () => {
Expand All @@ -75,9 +86,15 @@ describe("CloudFormation Custom Resource Handler", () => {
RequestType: "Delete",
};

await handler(mockEvent, {} as Context, () => {});
await handler(mockEvent, mockContext);

expect(send).toHaveBeenCalledWith(mockEvent, mockContext, SUCCESS, {}, "static");
expect(send).toHaveBeenCalledWith(
mockEvent,
mockContext,
SUCCESS,
{},
"static",
);
});

it("should send a FAILED response on error", async () => {
Expand All @@ -91,8 +108,14 @@ describe("CloudFormation Custom Resource Handler", () => {
send: sendMock,
}));

await handler(mockEvent, {} as Context, () => {});
await handler(mockEvent, mockContext);

expect(send).toHaveBeenCalledWith(mockEvent, mockContext, FAILED, {}, "static");
expect(send).toHaveBeenCalledWith(
mockEvent,
mockContext,
FAILED,
{},
"static",
);
});
});
18 changes: 8 additions & 10 deletions lib/lambda/search.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,22 @@ describe("getSearchData Handler", () => {

expect(res).toBeTruthy();
expect(res.statusCode).toEqual(400);
expect(res.body).toEqual({ message: "Index path parameter required" });
expect(res.body).toEqual(JSON.stringify({ message: "Index path parameter required" }));
});

it.skip("should return 200 with search results", async () => {
it("should return 200 with search results", async () => {
const event = {
body: {
query: {
match_all: {},
},
},
body: JSON.stringify({ query: { match_all: {} } }),
pathParameters: { index: "main" } as APIGatewayProxyEventPathParameters,
requestContext: getRequestContext(makoStateSubmitter),
} as unknown as APIGatewayEvent;
} as APIGatewayEvent;

const res = await handler(event);

expect(res).toBeTruthy();
expect(res.statusCode).toEqual(200);

const body = res.body;
const body = JSON.parse(res.body);
expect(body).toBeTruthy();
expect(body?.hits?.hits).toBeTruthy();
expect(body?.hits?.hits?.length).toEqual(11);
Expand All @@ -47,6 +43,8 @@ describe("getSearchData Handler", () => {

expect(res).toBeTruthy();
expect(res.statusCode).toEqual(500);
expect(res.body).toEqual({ error: "Internal server error", message: "Response Error" });
expect(res.body).toEqual(
JSON.stringify({ error: "Internal server error", message: "Response Error" }),
);
});
});
11 changes: 6 additions & 5 deletions lib/lambda/search.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { handleOpensearchError } from "./utils";
import { APIGatewayEvent } from "aws-lambda";
import { response } from "libs/handler-lib";
import { Index } from "shared-types/opensearch";
import { validateEnvVariable } from "shared-utils";
import { getStateFilter } from "../libs/api/auth/user";
Expand All @@ -10,10 +11,10 @@ import * as os from "../libs/opensearch-lib";
export const getSearchData = async (event: APIGatewayEvent) => {
validateEnvVariable("osDomain");
if (!event.pathParameters || !event.pathParameters.index) {
return {
return response({
statusCode: 400,
body: { message: "Index path parameter required" },
};
});
}
try {
let query: any = {};
Expand Down Expand Up @@ -56,12 +57,12 @@ export const getSearchData = async (event: APIGatewayEvent) => {
}
}

return {
return response<unknown>({
statusCode: 200,
body: results,
};
});
} catch (error) {
return handleOpensearchError(error);
return response(handleOpensearchError(error));
}
};

Expand Down
24 changes: 12 additions & 12 deletions lib/lambda/setupIndex.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,7 @@
import type { Handler } from "aws-lambda";
import { createIndex, updateFieldMapping } from "../libs/opensearch-lib";
import { Handler } from "aws-lambda";
import * as os from "../libs/opensearch-lib";
import { opensearch } from "../packages/shared-types";

const manageIndexResource = async (resource: {
osDomain: string;
index: opensearch.Index;
update?: object;
}) => {
await createIndex(resource.osDomain, resource.index);
if (!resource.update) return;
await updateFieldMapping(resource.osDomain, resource.index, resource.update);
};

export const handler: Handler = async (event, __, callback) => {
const response = {
statusCode: 200,
Expand Down Expand Up @@ -61,3 +51,13 @@ export const handler: Handler = async (event, __, callback) => {
callback(errorResponse, response);
}
};

const manageIndexResource = async (resource: {
osDomain: string;
index: opensearch.Index;
update?: object;
}) => {
await os.createIndex(resource.osDomain, resource.index);
if (!resource.update) return;
await os.updateFieldMapping(resource.osDomain, resource.index, resource.update);
};
62 changes: 27 additions & 35 deletions lib/lambda/sinkMain.ts
Original file line number Diff line number Diff line change
@@ -1,51 +1,43 @@
import { Handler } from "aws-lambda";
import { KafkaEvent, KafkaRecord } from "shared-types";
import { ErrorType, getTopic, logError } from "../libs";
import { KafkaEvent } from "shared-types";
import { ErrorType, getTopic, logError } from "libs";
import {
insertOneMacRecordsFromKafkaIntoMako,
insertNewSeatoolRecordsFromKafkaIntoMako,
syncSeatoolRecordDatesFromKafkaWithMako,
} from "./sinkMainProcessors";

export const handler: Handler<KafkaEvent> = async (event) => {
const eventInfo = JSON.stringify(event, null, 2);
console.log(`Received event: ${eventInfo}`);
const prettifiedEventJSON = JSON.stringify(event, null, 2);

console.log(`event: ${prettifiedEventJSON}`);

try {
// Process each topicPartition concurrently
await Promise.all(
Object.entries(event.records).map(async ([topicPartition, records]) =>
processTopicPartition(topicPartition, records),
),
Object.entries(event.records).map(async ([topicPartition, records]) => {
const topic = getTopic(topicPartition);

console.log(`topic: ${topic}`);

switch (topic) {
case "aws.onemac.migration.cdc":
return insertOneMacRecordsFromKafkaIntoMako(records, topicPartition);

case "aws.seatool.ksql.onemac.three.agg.State_Plan":
return insertNewSeatoolRecordsFromKafkaIntoMako(records, topicPartition);

case "aws.seatool.debezium.changed_date.SEA.dbo.State_Plan":
return syncSeatoolRecordDatesFromKafkaWithMako(records, topicPartition);

default:
logError({ type: ErrorType.BADTOPIC });
throw new Error(`topic (${topicPartition}) is invalid`);
}
}),
);
} catch (error) {
logError({ type: ErrorType.UNKNOWN, metadata: { event: eventInfo } });
logError({ type: ErrorType.UNKNOWN, metadata: { event: prettifiedEventJSON } });

throw error;
}
};

async function processTopicPartition(
topicPartition: string,
records: KafkaRecord[],
): Promise<void> {
const topic = getTopic(topicPartition);
if (!topic) {
logError({ type: ErrorType.BADTOPIC });
throw new Error(`Invalid topic: ${topicPartition}`);
}

switch (topic) {
case "aws.onemac.migration.cdc":
await insertOneMacRecordsFromKafkaIntoMako(records, topicPartition);
break;
case "aws.seatool.ksql.onemac.three.agg.State_Plan":
await insertNewSeatoolRecordsFromKafkaIntoMako(records, topicPartition);
break;
case "aws.seatool.debezium.changed_date.SEA.dbo.State_Plan":
await syncSeatoolRecordDatesFromKafkaWithMako(records, topicPartition);
break;
default:
logError({ type: ErrorType.BADTOPIC, metadata: { topic } });
throw new Error(`Unsupported topic: ${topic}`);
}
}

0 comments on commit b535756

Please sign in to comment.