Skip to content

Commit

Permalink
cds-1670 Fix Kinesis Integration issue with checking for Cloudwatch L…
Browse files Browse the repository at this point in the history
…og format in Payload (#119)

* updated kinesis workflow to check for Cloudwatch formatted log data automatically, renamed ungzip to gunzip

* changelog

* added test test_kinesis_with_cloudwatch_event

* cargo fmt

* update custom lambda to fix cds-1690

* adjust date

---------

Co-authored-by: guyrenny <[email protected]>
  • Loading branch information
daidokoro and guyrenny authored Nov 19, 2024
1 parent 9eb1036 commit 2c1c62d
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 64 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Changelog

## v1.0.16 / 2024-11-19
### 🧰 Bug fixes 🧰
- cds-1690 - Fixed a bug that when you update cloudwatch log group for an existing integraiotn from the CF the stack will fail.
- cds-1670 - Fixed a bug where Kinesis Integration was not correctly checking for Cloudwatch Formatted Logs in payload.

## v1.0.15 / 2024-11-09
### 💡 Enhancements 💡
- Add new parameter `LambdaAssumeRoleARN` which accept role arn, that the lambda will use for Execution role.
Expand Down
76 changes: 52 additions & 24 deletions custom-resource/index.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import boto3
import os
import json, time, boto3, time
from urllib import request, parse, error
import functools
Expand Down Expand Up @@ -402,11 +403,13 @@ def __init__(self, event, context, cfn):
@handle_exceptions
def create(self):
lambda_arn = self.event['ResourceProperties']['LambdaArn']
custom_lambda_arn = os.environ['AWS_LAMBDA_FUNCTION_NAME']
region = self.context.invoked_function_arn.split(":")[3]
account_id = self.context.invoked_function_arn.split(":")[4]
logGroupNames = self.params.CloudWatchLogGroupName.split(',')
LambdaPremissionPrefix = self.params.CloudWatchLogGroupPrefix.split(',')

environment_variables = {'log_groups': self.params.CloudWatchLogGroupName}
self.update_custom_lambda_environment_variables(custom_lambda_arn, environment_variables)
if LambdaPremissionPrefix and LambdaPremissionPrefix != [""]:
for prefix in LambdaPremissionPrefix:
replaced_prefix = self.check_statmentid_length(prefix)
Expand All @@ -429,13 +432,16 @@ def create(self):
if not LambdaPremissionPrefix or LambdaPremissionPrefix == [""]:
if not response.get("subscriptionFilters") or response.get("subscriptionFilters")[0].get("destinationArn") != lambda_arn:
replaced_prefix = self.check_statmentid_length(log_group)
response = self.aws_lambda.add_permission(
FunctionName=lambda_arn,
StatementId=f'allow-trigger-from-{replaced_prefix.replace("/", "-")}',
Action='lambda:InvokeFunction',
Principal='logs.amazonaws.com',
SourceArn=f'arn:aws:logs:{region}:{account_id}:log-group:{log_group}:*',
)
try:
response = self.aws_lambda.add_permission(
FunctionName=lambda_arn,
StatementId=f'allow-trigger-from-{replaced_prefix.replace("/", "-")}',
Action='lambda:InvokeFunction',
Principal='logs.amazonaws.com',
SourceArn=f'arn:aws:logs:{region}:{account_id}:log-group:{log_group}:*',
)
except Exception as e:
print("assuming permission already exists: ", str(e))
time.sleep(1)
self.cloudwatch_logs.put_subscription_filter(
destinationArn=self.event['ResourceProperties']['LambdaArn'],
Expand All @@ -450,8 +456,45 @@ def check_statmentid_length(self, statmentid_prefix):
updated_prefix = statmentid_prefix[:65] + statmentid_prefix[-5:]
return updated_prefix

def update_custom_lambda_environment_variables(self, function_name, new_environment_variables):
self.aws_lambda.update_function_configuration(
FunctionName=function_name,
Environment={
'Variables': new_environment_variables
}
)

def remove_subscription_filter(self, log_group, lambda_arn):
response = self.cloudwatch_logs.describe_subscription_filters(logGroupName=log_group)
lambda_arn = self.event['ResourceProperties']['LambdaArn']
LambdaPremissionPrefix = self.params.CloudWatchLogGroupPrefix.split(',')
for filter in response['subscriptionFilters']:
if filter['filterName'] == f'coralogix-aws-shipper-cloudwatch-trigger-{lambda_arn[-4:]}':
self.cloudwatch_logs.delete_subscription_filter(
filterName=f'coralogix-aws-shipper-cloudwatch-trigger-{lambda_arn[-4:]}',
logGroupName=log_group
)
if not LambdaPremissionPrefix:
replaced_prefix = self.check_statmentid_length(log_group)
response = self.aws_lambda.remove_permission(
FunctionName=lambda_arn,
StatementId=f'allow-trigger-from-{replaced_prefix.replace("/", "-")}'
)

@handle_exceptions
def update(self):
custom_lambda_name = os.environ['AWS_LAMBDA_FUNCTION_NAME']
new_log_group_names = self.params.CloudWatchLogGroupName.split(',')
new_environment_variables = {'log_groups': self.params.CloudWatchLogGroupName}

old_log_group_names = os.environ.get('log_groups').split(',')
for old_log_group in old_log_group_names:
if old_log_group not in new_log_group_names:
self.remove_subscription_filter(old_log_group, custom_lambda_name)

self.update_custom_lambda_environment_variables(custom_lambda_name, new_environment_variables)
self.create()

err = self.delete()
if err:
raise Exception(err)
Expand All @@ -461,24 +504,9 @@ def update(self):
@handle_exceptions
def delete(self):
lambda_arn = self.event['ResourceProperties']['LambdaArn']
region = self.context.invoked_function_arn.split(":")[3]
account_id = self.context.invoked_function_arn.split(":")[4]
logGroupNames = self.params.CloudWatchLogGroupName.split(',')
LambdaPremissionPrefix = self.params.CloudWatchLogGroupPrefix.split(',')
for log_group in logGroupNames:
response = self.cloudwatch_logs.describe_subscription_filters(logGroupName=log_group)
for filter in response['subscriptionFilters']:
if filter['filterName'] == f'coralogix-aws-shipper-cloudwatch-trigger-{lambda_arn[-4:]}':
self.cloudwatch_logs.delete_subscription_filter(
filterName=f'coralogix-aws-shipper-cloudwatch-trigger-{lambda_arn[-4:]}',
logGroupName=log_group
)
if not LambdaPremissionPrefix:
replaced_prefix = self.check_statmentid_length(log_group)
response = self.aws_lambda.remove_permission(
FunctionName=lambda_arn,
StatementId=f'allow-trigger-from-{replaced_prefix.replace("/", "-")}'
)
self.remove_subscription_filter(log_group, lambda_arn)

def handle(self):
responseStatus = self.cfn.SUCCESS
Expand Down
70 changes: 30 additions & 40 deletions src/logs/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,6 @@ impl Default for Metadata {
}
}

fn is_gzipped(data: &[u8]) -> bool {
// Check the first two bytes for gzip magic numbers
data.len() > 1 && data[0] == 0x1f && data[1] == 0x8b
}
pub async fn kinesis_logs(
kinesis_message: Base64Data,
coralogix_exporter: DynLogExporter,
Expand All @@ -179,39 +175,33 @@ pub async fn kinesis_logs(
.unwrap_or_else(|| "NO SUBSYSTEM NAME".to_string());
let v = kinesis_message.0;

let batches = if config.integration_type == IntegrationType::CloudWatch {
tracing::debug!("CloudWatch IntegrationType Detected");
let decompressed_data = match gunzip(v.clone(), String::new()) {
Ok(data) => data,
Err(_) => {
tracing::error!("Data does not appear to be valid gzip format. Treating as UTF-8");
v // set decompressed_data to the original data if decompression fails
}
};

let cloudwatch_payload = ungzip(v, String::new())?;
let decoded_data = match String::from_utf8(decompressed_data) {
Ok(s) => s,
Err(error) => {
tracing::error!(?error, "Failed to decode data");
String::new()
}
};

let string_cw = String::from_utf8(cloudwatch_payload)?;
tracing::debug!("CloudWatch Payload {:?}", string_cw);
let log_data: LogData = serde_json::from_str(&string_cw)?;
process_cloudwatch_logs(log_data, config.sampling, &config.blocking_pattern).await?
} else {
let ungzipped_data = if is_gzipped(&v) {
// It looks like gzip, attempt to ungzip
match ungzip(v.clone(), String::new()) {
Ok(un_v) => un_v,
Err(_) => {
tracing::error!(
"Data does not appear to be valid gzip format. Treating as UTF-8"
);
v
}
}
} else {
// Not gzip, treat as UTF-8
v.clone()
};
match String::from_utf8(ungzipped_data) {
Ok(s) => {
tracing::debug!("Kinesis Message: {:?}", s);
vec![s]
}
Err(error) => {
tracing::error!(?error, "Failed to decode data");
// String::from_utf8(decompressed_data.clone())?;
let batches = match serde_json::from_str(&decoded_data) {
Ok(logs) => {
process_cloudwatch_logs(logs, config.sampling, &config.blocking_pattern).await?
}
Err(_) => {
tracing::error!("Failed to decode data");
if decoded_data.is_empty() {
Vec::new()
} else {
vec![decoded_data]
}
}
};
Expand Down Expand Up @@ -458,7 +448,7 @@ async fn process_vpcflows(
key: String,
) -> Result<Vec<String>, Error> {
info!("VPC Flow Integration Type");
let v = ungzip(raw_data, key)?;
let v = gunzip(raw_data, key)?;
let s = String::from_utf8(v)?;
let array_s = split(Regex::new(r"\n")?, s.as_str())?;
let flow_header = split(Regex::new(r"\s+")?, array_s[0])?;
Expand Down Expand Up @@ -492,7 +482,7 @@ async fn process_csv(
csv_delimiter = "\t";
}
let s = if key_path.extension() == Some(OsStr::new("gz")) {
let v = ungzip(raw_data, key)?;
let v = gunzip(raw_data, key)?;
let s = String::from_utf8(v)?;
debug!("ZIP S3 object: {}", s);
s
Expand Down Expand Up @@ -547,7 +537,7 @@ async fn process_s3(
key: String,
) -> Result<Vec<String>, Error> {
let s = if key_path.extension() == Some(OsStr::new("gz")) {
let v = ungzip(raw_data, key)?;
let v = gunzip(raw_data, key)?;
let s = String::from_utf8(v)?;
debug!("ZIP S3 object: {}", s);
s
Expand Down Expand Up @@ -587,7 +577,7 @@ async fn process_cloudtrail(
key: String,
) -> Result<Vec<String>, Error> {
tracing::info!("Cloudtrail Integration Type");
let v = ungzip(raw_data, key)?;
let v = gunzip(raw_data, key)?;
let s = String::from_utf8(v)?;
let mut logs_vec: Vec<String> = Vec::new();
let array_s = serde_json::from_str::<serde_json::Value>(&s)?;
Expand Down Expand Up @@ -665,7 +655,7 @@ pub async fn kafka_logs(
Ok(())
}

fn ungzip(compressed_data: Vec<u8>, _: String) -> Result<Vec<u8>, Error> {
fn gunzip(compressed_data: Vec<u8>, _: String) -> Result<Vec<u8>, Error> {
if compressed_data.is_empty() {
tracing::warn!("Input data is empty, cannot ungzip a zero-byte file.");
return Ok(Vec::new());
Expand All @@ -688,7 +678,7 @@ fn ungzip(compressed_data: Vec<u8>, _: String) -> Result<Vec<u8>, Error> {
"Problem decompressing data after {} bytes",
output.len()
);
return Ok(output);
break;
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1197,6 +1197,8 @@ Resources:
- lambda:CreateEventSourceMapping
- lambda:DeleteEventSourceMapping
- lambda:UpdateEventSourceMapping
- lambda:GetFunctionConfiguration
- lambda:UpdateFunctionConfiguration
Resource: '*'
- Statement:
- Sid: S3NotificationPolicy
Expand Down
86 changes: 86 additions & 0 deletions tests/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1308,6 +1308,92 @@ async fn test_kinesis_event() {
.await;
}

async fn run_kinesis_with_cloudwatch_event() {
let config = Config::load_from_env().unwrap();
let evt: Combined = serde_json::from_str(
r#"{
"Records": [
{
"awsRegion": "us-east-1",
"eventID": "shardId-000000000000:00000000000000000000000000000000000000000000000000000000",
"eventName": "aws:kinesis:record",
"eventSource": "aws:kinesis",
"eventSourceARN": "arn:aws:kinesis:us-east-1:0000000000:stream/mystream",
"eventVersion": "1.0",
"invokeIdentityArn": "arn:aws:iam::0000000000:role/cargo-lambda-role-0000000-0000-0000-0000-00000000000",
"kinesis": {
"approximateArrivalTimestamp": 1704715421.323,
"data": "eyJvd25lciI6ICIxMTExMTExMTExMTEiLCAibG9nR3JvdXAiOiAiQ2xvdWRUcmFpbC9sb2dzIiwgImxvZ1N0cmVhbSI6ICIxMTExMTExMTExMTFfQ2xvdWRUcmFpbC9sb2dzX3VzLWVhc3QtMSIsICJzdWJzY3JpcHRpb25GaWx0ZXJzIjogWyJEZXN0aW5hdGlvbiJdLCAibWVzc2FnZVR5cGUiOiAiREFUQV9NRVNTQUdFIiwgImxvZ0V2ZW50cyI6IFt7ImlkIjogIjMxOTUzMTA2NjA2OTY2OTgzMzc4ODA5MDI1MDc5ODA0MjExMTQzMjg5NjE1NDI0Mjk4MjIxNTY4IiwgInRpbWVzdGFtcCI6IDE0MzI4MjY4NTUwMDAsICJtZXNzYWdlIjogImhlbGxvIHdvcmxkIn0sIHsiaWQiOiAiMzE5NTMxMDY2MDY5NjY5ODMzNzg4MDkwMjUwNzk4MDQyMTExNDMyODk2MTU0MjQyOTgyMjE1NjkiLCAidGltZXN0YW1wIjogMTQzMjgyNjg1NTAwMCwgIm1lc3NhZ2UiOiAiZ29vZGJ5ZSBkcmVhbXMifV19Cg==",
"kinesisSchemaVersion": "1.0",
"partitionKey": "partition_key",
"sequenceNumber": "49647983248916725783135500075978324609922193443375808530"
}
}
]
}"#,
)
.expect("failed to parse kinesis_event");

let exporter = Arc::new(FakeLogExporter::new());
let event = LambdaEvent::new(evt, Context::default());
let sqs_client = get_mock_sqsclient(None).unwrap();
let s3_client = get_mock_s3client(None).unwrap();
let ecr_client = get_mock_ecrclient(None).unwrap();
let clients = AwsClients {
s3: s3_client,
sqs: sqs_client,
ecr: ecr_client,
};

coralogix_aws_shipper::logs::handler(&clients, exporter.clone(), &config, event)
.await
.unwrap();

let bulks = exporter.take_bulks();
assert!(bulks.is_empty());

let singles = exporter.take_singles();
assert_eq!(singles.len(), 1);
assert_eq!(singles[0].entries.len(), 2);
let log_lines = vec!["hello world", "goodbye dreams"];

for (i, log_line) in log_lines.iter().enumerate() {
assert!(
singles[0].entries[i].body == *log_line,
"log line: {}",
singles[0].entries[i].body
);
}

assert!(
singles[0].entries[0].application_name == "integration-testing",
"got application_name: {}",
singles[0].entries[0].application_name
);
assert!(
singles[0].entries[0].subsystem_name == "lambda",
"got subsystem_name: {}",
singles[0].entries[0].subsystem_name
);
}

#[tokio::test]
async fn test_kinesis_with_cloudwatch_event() {
temp_env::async_with_vars(
[
("CORALOGIX_API_KEY", Some("1234456789X")),
("APP_NAME", Some("integration-testing")),
("CORALOGIX_ENDPOINT", Some("localhost:8080")),
("SAMPLING", Some("1")),
("SUB_NAME", Some("lambda")),
("AWS_REGION", Some("eu-central-1")),
("INTEGRATION_TYPE", Some("Kinesis")),
],
run_kinesis_with_cloudwatch_event(),
)
.await;
}

async fn run_cloudfront_s3_event() {
let s3_client = get_mock_s3client(Some("./tests/fixtures/cloudfront.gz"))
.expect("failed to create s3 client");
Expand Down

0 comments on commit 2c1c62d

Please sign in to comment.