Skip to content

Commit

Permalink
Reassign kafka event source mapping to lambda (#139750)
Browse files Browse the repository at this point in the history
  • Loading branch information
gmrabian authored Aug 27, 2024
1 parent 6fe173c commit 2b21a6f
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 474 deletions.
102 changes: 0 additions & 102 deletions services/carts-bigmac-streams/handlers/configureConnectors.js

This file was deleted.

78 changes: 43 additions & 35 deletions services/carts-bigmac-streams/handlers/sinkEnrollmentCounts.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,47 +14,55 @@ const {
* @param {*} _callback
*/
async function myHandler(event, _context, _callback) {
const json = JSON.parse(event.value);
const sedsTopicKey = `${process.env.sedsTopic}-0`;
if (!event?.records?.[sedsTopicKey]) {
return;
}
const records = event.records[sedsTopicKey];
const currentYear = getReportingYear();
const dynamoClient = buildClient();

if (
json.NewImage.enrollmentCounts &&
json.NewImage.enrollmentCounts.year >= currentYear - 1 &&
json.NewImage.quarter === 4
) {
try {
// eslint-disable-next-line no-console
console.log("Sink message received", json);
const indexToUpdate =
json.NewImage.enrollmentCounts.year === currentYear ? 2 : 1;
let typeOfEnrollment = "Medicaid Expansion CHIP";
let typeKey = "medicaid_exp_chip";
if (json.NewImage.enrollmentCounts.type === "separate") {
typeOfEnrollment = "Separate CHIP";
typeKey = "separate_chip";
}
const stateId = json.NewImage.state_id;
const createdTime = new Date().toLocaleString();
for (const record of records) {
const decodedValue = atob(record.value);
const value = JSON.parse(decodedValue);
if (
value.NewImage.enrollmentCounts &&
value.NewImage.enrollmentCounts.year >= currentYear - 1 &&
value.NewImage.quarter === 4
) {
try {
// eslint-disable-next-line no-console
console.log("Sink message received", value);
const indexToUpdate =
value.NewImage.enrollmentCounts.year === currentYear ? 2 : 1;
let typeOfEnrollment = "Medicaid Expansion CHIP";
let typeKey = "medicaid_exp_chip";
if (value.NewImage.enrollmentCounts.type === "separate") {
typeOfEnrollment = "Separate CHIP";
typeKey = "separate_chip";
}
const stateId = value.NewImage.state_id;
const createdTime = new Date().toLocaleString();

const pk = `${stateId}-${currentYear}`;
const entryKey = `${typeKey}-${indexToUpdate}`;
const pk = `${stateId}-${currentYear}`;
const entryKey = `${typeKey}-${indexToUpdate}`;

const enrollmentEntry = {
filterId: `${currentYear}-02`,
typeOfEnrollment,
indexToUpdate,
stateId,
yearToModify: currentYear,
enrollmentCount: json.NewImage.enrollmentCounts.count,
createdTime,
lastSynced: json.NewImage.lastSynced,
};
const enrollmentEntry = {
filterId: `${currentYear}-02`,
typeOfEnrollment,
indexToUpdate,
stateId,
yearToModify: currentYear,
enrollmentCount: value.NewImage.enrollmentCounts.count,
createdTime,
lastSynced: value.NewImage.lastSynced ?? "",
};

await updateEnrollment(pk, entryKey, enrollmentEntry, dynamoClient);
} catch (error) {
// eslint-disable-next-line no-console
console.log(error);
await updateEnrollment(pk, entryKey, enrollmentEntry, dynamoClient);
} catch (error) {
// eslint-disable-next-line no-console
console.log(error);
}
}
}
}
Expand Down
8 changes: 2 additions & 6 deletions services/carts-bigmac-streams/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,8 @@
"description": "",
"dependencies": {
"@aws-sdk/client-dynamodb": "^3.621.0",
"@aws-sdk/client-ecs": "^3.621.0",
"@aws-sdk/lib-dynamodb": "^3.621.0",
"@aws-sdk/util-dynamodb": "^3.621.0",
"kafkajs": "^1.15.0",
"lodash": "^4.17.21",
"uuid": "^8.3.2"
},
"devDependencies": {}
"kafkajs": "^1.15.0"
}
}
Loading

0 comments on commit 2b21a6f

Please sign in to comment.