Skip to content

Commit

Permalink
Changes required for Mako (#1365)
Browse files Browse the repository at this point in the history
* Add a source service to stream onemac data to kafka (#1292)

* Add a source service to stream data out of dynamo

* set the deploy to be conditional and honor the correct envs

* Add a cross account role to fetch S3 uploads (#1319)

* feat(cross-acct service):  Create a cross acct role for uploads

* correct names

* deps

* Add getobjecttagging perms so we can honor the clean tags
  • Loading branch information
mdial89f authored Sep 12, 2023
1 parent 7546188 commit 9982d86
Show file tree
Hide file tree
Showing 16 changed files with 9,784 additions and 0 deletions.
6 changes: 6 additions & 0 deletions deploy.sh
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ services=(
'admin'
)

# Only deploy for higher envs
if [[ "$stage" == "develop" || "$stage" == "master" || "$stage" == "production" ]]; then
services+=('source')
services+=('cross-acct')
fi

set -e
for i in "${services[@]}"; do
deploy $i
Expand Down
57 changes: 57 additions & 0 deletions services/cross-acct/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Logs
logs
*.log
npm-debug.log*

# Runtime data
pids
*.pid
*.seed

# Directory for instrumented libs generated by jscoverage/JSCover
lib-cov

# Coverage directory used by tools like istanbul
coverage

# nyc test coverage
.nyc_output

# Grunt intermediate storage (http://gruntjs.com/creating-plugins#storing-task-files)
.grunt

# node-waf configuration
.lock-wscript

# Compiled binary addons (http://nodejs.org/api/addons.html)
build/Release

# Dependency directories
node_modules
jspm_packages

# Optional npm cache directory
.npm

# Optional REPL history
.node_repl_history

# vim
.*.sw*
Session.vim

# Serverless
.webpack
.serverless

# env
env.yml
.env

# Jetbrains IDEs
.idea

# Serverless warmup plugin temp dir
_warmup

.dynamodb
5 changes: 5 additions & 0 deletions services/cross-acct/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Cross Account

### Purpose

This service exists to create a single IAM role. This role trusts certain IAM entities in MACPro platform account to assume it. The ultimate purpose is to allow a lambda function in MACPro micro to be able to create short lived presigned S3 urls to the uploads bucket in Onemac; in lieu of migrating the S3 data to platform account, we will simply reach back to the original bucket using this cross account role.
56 changes: 56 additions & 0 deletions services/cross-acct/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions services/cross-acct/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
{
"name": "cross-acct",
"description": "",
"version": "1.0.0",
"author": "",
"license": "CC0-1.0",
"devDependencies": {
"serverless-iam-helper": "file:../../plugins/serverless-iam-helper",
"lodash": "^4.17.20"
},
"dependencies": {}
}
58 changes: 58 additions & 0 deletions services/cross-acct/serverless.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
service: cross-acct

frameworkVersion: "3"

plugins:
- serverless-iam-helper
- serverless-s3-bucket-helper

custom:
stage: ${opt:stage, self:provider.stage}
iamPath: ${ssm:/configuration/${self:custom.stage}/iam/path, ssm:/configuration/default/iam/path, "/"}
iamPermissionsBoundaryPolicy: ${ssm:/configuration/${self:custom.stage}/iam/permissionsBoundaryPolicy, ssm:/configuration/default/iam/permissionsBoundaryPolicy, ""}
attachmentsBucketArn: ${cf:uploads-${self:custom.stage}.AttachmentsBucketArn}
destinationAcct: ${ssm:/configuration/${self:custom.stage}/cross-acct/destinationAcct, ssm:/configuration/default/cross-acct/destinationAcct}

provider:
name: aws
runtime: nodejs14.x
region: us-east-1
stage: dev
iam:
role:
path: ${self:custom.iamPath}
permissionsBoundary: !Sub 'arn:aws:iam::${AWS::AccountId}:policy${self:custom.iamPermissionsBoundaryPolicy, ""}'

resources:
Resources:
CrossAccountS3:
Type: "AWS::IAM::Role"
Properties:
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: "Allow"
Principal:
AWS: ${self:custom.destinationAcct}
Action: "sts:AssumeRole"
Path: ${self:custom.iamPath}
PermissionsBoundary:
Fn::If:
- CreatePermissionsBoundary
- Fn::Join:
- ""
- - "arn:aws:iam::"
- Ref: AWS::AccountId
- ":policy"
- '${self:custom.iamPermissionsBoundaryPolicy, ""}'
- Ref: AWS::NoValue
Policies:
- PolicyName: "S3GetObjectPolicy"
PolicyDocument:
Version: "2012-10-17"
Statement:
- Effect: "Allow"
Action:
- s3:GetObject
- s3:GetObjectTagging
Resource: !Sub ${self:custom.attachmentsBucketArn}/*
7 changes: 7 additions & 0 deletions services/source/.eslintrc.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
rules:
no-console: off
# TODO: remove this after fixing the names
"@typescript-eslint/camelcase": off
"@typescript-eslint/no-var-requires": off
# TODO turn this rule back on once we figure out types for this module
"@typescript-eslint/explicit-module-boundary-types": off
8 changes: 8 additions & 0 deletions services/source/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# package directories
node_modules
jspm_packages

# Serverless directories
.serverless

.dump.json
26 changes: 26 additions & 0 deletions services/source/handlers/cleanupKafka.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { send, SUCCESS, FAILED } from "cfn-response-async";
import * as topics from "./../libs/topics-lib.js";

exports.handler = async function (event, context) {
console.log("Request:", JSON.stringify(event, undefined, 2));
const responseData = {};
let responseStatus = SUCCESS;
try {
const BrokerString = event.ResourceProperties.BrokerString;
const TopicPatternsToDelete =
event.ResourceProperties.TopicPatternsToDelete;
if (event.RequestType === "Create" || event.RequestType == "Update") {
console.log("This resource does nothing on Create and Update events.");
} else if (event.RequestType === "Delete") {
console.log(
`Attempting a delete for each of the following patterns: ${TopicPatternsToDelete}`
);
await topics.deleteTopics(BrokerString, TopicPatternsToDelete);
}
} catch (error) {
console.error(error);
responseStatus = FAILED;
} finally {
await send(event, context, responseStatus, responseData, "static");
}
};
42 changes: 42 additions & 0 deletions services/source/handlers/createTopics.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import { send, SUCCESS, FAILED } from "cfn-response-async";
import * as topics from "./../libs/topics-lib.js";

exports.handler = async function (event, context) {
console.log("Request:", JSON.stringify(event, undefined, 2));
const responseData = {};
let responseStatus = SUCCESS;
try {
const TopicsToCreate = event.ResourceProperties.TopicsToCreate;
const BrokerString = event.ResourceProperties.BrokerString;
const topicConfig = TopicsToCreate.map(function (element) {
const topic = element.name;
const replicationFactor = element.replicationFactor || 3;
const numPartitions = element.numPartitions || 1;
if (!topic) {
throw "Invalid configuration for TopicsToCreate. All entries must have a 'name' key with a string value.";
}
if (replicationFactor < 3) {
throw "Invalid configuration for TopicsToCreate. If specified, replicationFactor must be greater than or equal to 3.";
}
if (numPartitions < 1) {
throw "Invalid configuration for TopicsToCreate. If specified, numPartitions must be greater than or equal to 1.";
}
return {
topic,
numPartitions,
replicationFactor,
};
});
console.log(JSON.stringify(topicConfig, null, 2));
if (event.RequestType === "Create" || event.RequestType == "Update") {
await topics.createTopics(BrokerString, topicConfig);
} else if (event.RequestType === "Delete") {
console.log("This resource does nothing on Delete events.");
}
} catch (error) {
console.error(error);
responseStatus = FAILED;
} finally {
await send(event, context, responseStatus, responseData, "static");
}
};
77 changes: 77 additions & 0 deletions services/source/handlers/source.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import AWS from "aws-sdk";
const { Kafka } = require("kafkajs");
const kafka = new Kafka({
clientId: `${process.env.topic}-producer`,
brokers: process.env.brokerString.split(","),
retry: {
initialRetryTime: 300,
retries: 8,
},
ssl: {
rejectUnauthorized: false,
},
});
const producer = kafka.producer();
const signalTraps = ["SIGTERM", "SIGINT", "SIGUSR2", "beforeExit"];
signalTraps.map((type) => {
process.removeListener(type, producer.disconnect);
});
signalTraps.map((type) => {
process.once(type, producer.disconnect);
});
let connected = false;

function unmarshall(r) {
return AWS.DynamoDB.Converter.unmarshall(r, {
convertEmptyValues: true,
wrapNumbers: true,
});
}

function isValid(r) {
if (r.sk && r.sk.includes("SEATool")) {
console.log("Junk record detected.");
return false;
} else {
console.log("Valid record detected.");
return true;
}
}

exports.handler = async function (event) {
const messages = [];
for (const record of event.Records) {
if (record.eventName != "REMOVE") {
const r = unmarshall(record.dynamodb.NewImage);
if (isValid(r)) {
messages.push({
key: r.pk,
value: JSON.stringify(r),
partition: 0,
headers: { source: "onemac" },
});
}
} else {
const r = unmarshall(record.dynamodb.OldImage);
if (isValid(r)) {
messages.push({
key: r.pk,
value: null,
partition: 0,
headers: { source: "onemac" },
});
}
}
}
if (messages.length > 0) {
console.log(`Sending ${messages.length} messages to Kafka`);
if (!connected) {
await producer.connect();
connected = true;
}
await producer.send({
topic: process.env.topic,
messages,
});
}
};
Loading

0 comments on commit 9982d86

Please sign in to comment.