From 2c1c62d6c099cfc7a8a69896203edf6993c565f7 Mon Sep 17 00:00:00 2001 From: Shaun Remekie Date: Tue, 19 Nov 2024 22:15:18 +0100 Subject: [PATCH] cds-1670 Fix Kinesis Integration issue with checking for Cloudwatch Log format in Payload (#119) * updated kinesis workflow to check for Cloudwatch formatted log data automatically, renamed ungzip to gunzip * changelog * added test test_kinesis_with_cloudwatch_event * cargo fmt * update custom lambda to fix cds-1690 * adjust date --------- Co-authored-by: guyrenny --- CHANGELOG.md | 5 +++ custom-resource/index.py | 76 ++++++++++++++++++++++++----------- src/logs/process.rs | 70 ++++++++++++++------------------ template.yaml | 2 + tests/logs.rs | 86 ++++++++++++++++++++++++++++++++++++++++ 5 files changed, 175 insertions(+), 64 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c5b8ef89..53d27b28 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ # Changelog +## v1.0.16 / 2024-11-19 +### 🧰 Bug fixes 🧰 +- cds-1690 - Fixed a bug that when you update cloudwatch log group for an existing integraiotn from the CF the stack will fail. +- cds-1670 - Fixed a bug where Kinesis Integration was not correctly checking for Cloudwatch Formatted Logs in payload. + ## v1.0.15 / 2024-11-09 ### 💡 Enhancements 💡 - Add new parameter `LambdaAssumeRoleARN` which accept role arn, that the lambda will use for Execution role. diff --git a/custom-resource/index.py b/custom-resource/index.py index c340235b..74bb7409 100644 --- a/custom-resource/index.py +++ b/custom-resource/index.py @@ -1,5 +1,6 @@ import json import boto3 +import os import json, time, boto3, time from urllib import request, parse, error import functools @@ -402,11 +403,13 @@ def __init__(self, event, context, cfn): @handle_exceptions def create(self): lambda_arn = self.event['ResourceProperties']['LambdaArn'] + custom_lambda_arn = os.environ['AWS_LAMBDA_FUNCTION_NAME'] region = self.context.invoked_function_arn.split(":")[3] account_id = self.context.invoked_function_arn.split(":")[4] logGroupNames = self.params.CloudWatchLogGroupName.split(',') LambdaPremissionPrefix = self.params.CloudWatchLogGroupPrefix.split(',') - + environment_variables = {'log_groups': self.params.CloudWatchLogGroupName} + self.update_custom_lambda_environment_variables(custom_lambda_arn, environment_variables) if LambdaPremissionPrefix and LambdaPremissionPrefix != [""]: for prefix in LambdaPremissionPrefix: replaced_prefix = self.check_statmentid_length(prefix) @@ -429,13 +432,16 @@ def create(self): if not LambdaPremissionPrefix or LambdaPremissionPrefix == [""]: if not response.get("subscriptionFilters") or response.get("subscriptionFilters")[0].get("destinationArn") != lambda_arn: replaced_prefix = self.check_statmentid_length(log_group) - response = self.aws_lambda.add_permission( - FunctionName=lambda_arn, - StatementId=f'allow-trigger-from-{replaced_prefix.replace("/", "-")}', - Action='lambda:InvokeFunction', - Principal='logs.amazonaws.com', - SourceArn=f'arn:aws:logs:{region}:{account_id}:log-group:{log_group}:*', - ) + try: + response = self.aws_lambda.add_permission( + FunctionName=lambda_arn, + StatementId=f'allow-trigger-from-{replaced_prefix.replace("/", "-")}', + Action='lambda:InvokeFunction', + Principal='logs.amazonaws.com', + SourceArn=f'arn:aws:logs:{region}:{account_id}:log-group:{log_group}:*', + ) + except Exception as e: + print("assuming permission already exists: ", str(e)) time.sleep(1) self.cloudwatch_logs.put_subscription_filter( destinationArn=self.event['ResourceProperties']['LambdaArn'], @@ -450,8 +456,45 @@ def check_statmentid_length(self, statmentid_prefix): updated_prefix = statmentid_prefix[:65] + statmentid_prefix[-5:] return updated_prefix + def update_custom_lambda_environment_variables(self, function_name, new_environment_variables): + self.aws_lambda.update_function_configuration( + FunctionName=function_name, + Environment={ + 'Variables': new_environment_variables + } + ) + + def remove_subscription_filter(self, log_group, lambda_arn): + response = self.cloudwatch_logs.describe_subscription_filters(logGroupName=log_group) + lambda_arn = self.event['ResourceProperties']['LambdaArn'] + LambdaPremissionPrefix = self.params.CloudWatchLogGroupPrefix.split(',') + for filter in response['subscriptionFilters']: + if filter['filterName'] == f'coralogix-aws-shipper-cloudwatch-trigger-{lambda_arn[-4:]}': + self.cloudwatch_logs.delete_subscription_filter( + filterName=f'coralogix-aws-shipper-cloudwatch-trigger-{lambda_arn[-4:]}', + logGroupName=log_group + ) + if not LambdaPremissionPrefix: + replaced_prefix = self.check_statmentid_length(log_group) + response = self.aws_lambda.remove_permission( + FunctionName=lambda_arn, + StatementId=f'allow-trigger-from-{replaced_prefix.replace("/", "-")}' + ) + @handle_exceptions def update(self): + custom_lambda_name = os.environ['AWS_LAMBDA_FUNCTION_NAME'] + new_log_group_names = self.params.CloudWatchLogGroupName.split(',') + new_environment_variables = {'log_groups': self.params.CloudWatchLogGroupName} + + old_log_group_names = os.environ.get('log_groups').split(',') + for old_log_group in old_log_group_names: + if old_log_group not in new_log_group_names: + self.remove_subscription_filter(old_log_group, custom_lambda_name) + + self.update_custom_lambda_environment_variables(custom_lambda_name, new_environment_variables) + self.create() + err = self.delete() if err: raise Exception(err) @@ -461,24 +504,9 @@ def update(self): @handle_exceptions def delete(self): lambda_arn = self.event['ResourceProperties']['LambdaArn'] - region = self.context.invoked_function_arn.split(":")[3] - account_id = self.context.invoked_function_arn.split(":")[4] logGroupNames = self.params.CloudWatchLogGroupName.split(',') - LambdaPremissionPrefix = self.params.CloudWatchLogGroupPrefix.split(',') for log_group in logGroupNames: - response = self.cloudwatch_logs.describe_subscription_filters(logGroupName=log_group) - for filter in response['subscriptionFilters']: - if filter['filterName'] == f'coralogix-aws-shipper-cloudwatch-trigger-{lambda_arn[-4:]}': - self.cloudwatch_logs.delete_subscription_filter( - filterName=f'coralogix-aws-shipper-cloudwatch-trigger-{lambda_arn[-4:]}', - logGroupName=log_group - ) - if not LambdaPremissionPrefix: - replaced_prefix = self.check_statmentid_length(log_group) - response = self.aws_lambda.remove_permission( - FunctionName=lambda_arn, - StatementId=f'allow-trigger-from-{replaced_prefix.replace("/", "-")}' - ) + self.remove_subscription_filter(log_group, lambda_arn) def handle(self): responseStatus = self.cfn.SUCCESS diff --git a/src/logs/process.rs b/src/logs/process.rs index 1aae8909..730502f1 100644 --- a/src/logs/process.rs +++ b/src/logs/process.rs @@ -152,10 +152,6 @@ impl Default for Metadata { } } -fn is_gzipped(data: &[u8]) -> bool { - // Check the first two bytes for gzip magic numbers - data.len() > 1 && data[0] == 0x1f && data[1] == 0x8b -} pub async fn kinesis_logs( kinesis_message: Base64Data, coralogix_exporter: DynLogExporter, @@ -179,39 +175,33 @@ pub async fn kinesis_logs( .unwrap_or_else(|| "NO SUBSYSTEM NAME".to_string()); let v = kinesis_message.0; - let batches = if config.integration_type == IntegrationType::CloudWatch { - tracing::debug!("CloudWatch IntegrationType Detected"); + let decompressed_data = match gunzip(v.clone(), String::new()) { + Ok(data) => data, + Err(_) => { + tracing::error!("Data does not appear to be valid gzip format. Treating as UTF-8"); + v // set decompressed_data to the original data if decompression fails + } + }; - let cloudwatch_payload = ungzip(v, String::new())?; + let decoded_data = match String::from_utf8(decompressed_data) { + Ok(s) => s, + Err(error) => { + tracing::error!(?error, "Failed to decode data"); + String::new() + } + }; - let string_cw = String::from_utf8(cloudwatch_payload)?; - tracing::debug!("CloudWatch Payload {:?}", string_cw); - let log_data: LogData = serde_json::from_str(&string_cw)?; - process_cloudwatch_logs(log_data, config.sampling, &config.blocking_pattern).await? - } else { - let ungzipped_data = if is_gzipped(&v) { - // It looks like gzip, attempt to ungzip - match ungzip(v.clone(), String::new()) { - Ok(un_v) => un_v, - Err(_) => { - tracing::error!( - "Data does not appear to be valid gzip format. Treating as UTF-8" - ); - v - } - } - } else { - // Not gzip, treat as UTF-8 - v.clone() - }; - match String::from_utf8(ungzipped_data) { - Ok(s) => { - tracing::debug!("Kinesis Message: {:?}", s); - vec![s] - } - Err(error) => { - tracing::error!(?error, "Failed to decode data"); + // String::from_utf8(decompressed_data.clone())?; + let batches = match serde_json::from_str(&decoded_data) { + Ok(logs) => { + process_cloudwatch_logs(logs, config.sampling, &config.blocking_pattern).await? + } + Err(_) => { + tracing::error!("Failed to decode data"); + if decoded_data.is_empty() { Vec::new() + } else { + vec![decoded_data] } } }; @@ -458,7 +448,7 @@ async fn process_vpcflows( key: String, ) -> Result, Error> { info!("VPC Flow Integration Type"); - let v = ungzip(raw_data, key)?; + let v = gunzip(raw_data, key)?; let s = String::from_utf8(v)?; let array_s = split(Regex::new(r"\n")?, s.as_str())?; let flow_header = split(Regex::new(r"\s+")?, array_s[0])?; @@ -492,7 +482,7 @@ async fn process_csv( csv_delimiter = "\t"; } let s = if key_path.extension() == Some(OsStr::new("gz")) { - let v = ungzip(raw_data, key)?; + let v = gunzip(raw_data, key)?; let s = String::from_utf8(v)?; debug!("ZIP S3 object: {}", s); s @@ -547,7 +537,7 @@ async fn process_s3( key: String, ) -> Result, Error> { let s = if key_path.extension() == Some(OsStr::new("gz")) { - let v = ungzip(raw_data, key)?; + let v = gunzip(raw_data, key)?; let s = String::from_utf8(v)?; debug!("ZIP S3 object: {}", s); s @@ -587,7 +577,7 @@ async fn process_cloudtrail( key: String, ) -> Result, Error> { tracing::info!("Cloudtrail Integration Type"); - let v = ungzip(raw_data, key)?; + let v = gunzip(raw_data, key)?; let s = String::from_utf8(v)?; let mut logs_vec: Vec = Vec::new(); let array_s = serde_json::from_str::(&s)?; @@ -665,7 +655,7 @@ pub async fn kafka_logs( Ok(()) } -fn ungzip(compressed_data: Vec, _: String) -> Result, Error> { +fn gunzip(compressed_data: Vec, _: String) -> Result, Error> { if compressed_data.is_empty() { tracing::warn!("Input data is empty, cannot ungzip a zero-byte file."); return Ok(Vec::new()); @@ -688,7 +678,7 @@ fn ungzip(compressed_data: Vec, _: String) -> Result, Error> { "Problem decompressing data after {} bytes", output.len() ); - return Ok(output); + break; } } } diff --git a/template.yaml b/template.yaml index a613340c..c99c18dd 100644 --- a/template.yaml +++ b/template.yaml @@ -1197,6 +1197,8 @@ Resources: - lambda:CreateEventSourceMapping - lambda:DeleteEventSourceMapping - lambda:UpdateEventSourceMapping + - lambda:GetFunctionConfiguration + - lambda:UpdateFunctionConfiguration Resource: '*' - Statement: - Sid: S3NotificationPolicy diff --git a/tests/logs.rs b/tests/logs.rs index 17948326..9702e37b 100644 --- a/tests/logs.rs +++ b/tests/logs.rs @@ -1308,6 +1308,92 @@ async fn test_kinesis_event() { .await; } +async fn run_kinesis_with_cloudwatch_event() { + let config = Config::load_from_env().unwrap(); + let evt: Combined = serde_json::from_str( + r#"{ + "Records": [ + { + "awsRegion": "us-east-1", + "eventID": "shardId-000000000000:00000000000000000000000000000000000000000000000000000000", + "eventName": "aws:kinesis:record", + "eventSource": "aws:kinesis", + "eventSourceARN": "arn:aws:kinesis:us-east-1:0000000000:stream/mystream", + "eventVersion": "1.0", + "invokeIdentityArn": "arn:aws:iam::0000000000:role/cargo-lambda-role-0000000-0000-0000-0000-00000000000", + "kinesis": { + "approximateArrivalTimestamp": 1704715421.323, + "data": "eyJvd25lciI6ICIxMTExMTExMTExMTEiLCAibG9nR3JvdXAiOiAiQ2xvdWRUcmFpbC9sb2dzIiwgImxvZ1N0cmVhbSI6ICIxMTExMTExMTExMTFfQ2xvdWRUcmFpbC9sb2dzX3VzLWVhc3QtMSIsICJzdWJzY3JpcHRpb25GaWx0ZXJzIjogWyJEZXN0aW5hdGlvbiJdLCAibWVzc2FnZVR5cGUiOiAiREFUQV9NRVNTQUdFIiwgImxvZ0V2ZW50cyI6IFt7ImlkIjogIjMxOTUzMTA2NjA2OTY2OTgzMzc4ODA5MDI1MDc5ODA0MjExMTQzMjg5NjE1NDI0Mjk4MjIxNTY4IiwgInRpbWVzdGFtcCI6IDE0MzI4MjY4NTUwMDAsICJtZXNzYWdlIjogImhlbGxvIHdvcmxkIn0sIHsiaWQiOiAiMzE5NTMxMDY2MDY5NjY5ODMzNzg4MDkwMjUwNzk4MDQyMTExNDMyODk2MTU0MjQyOTgyMjE1NjkiLCAidGltZXN0YW1wIjogMTQzMjgyNjg1NTAwMCwgIm1lc3NhZ2UiOiAiZ29vZGJ5ZSBkcmVhbXMifV19Cg==", + "kinesisSchemaVersion": "1.0", + "partitionKey": "partition_key", + "sequenceNumber": "49647983248916725783135500075978324609922193443375808530" + } + } + ] + }"#, + ) + .expect("failed to parse kinesis_event"); + + let exporter = Arc::new(FakeLogExporter::new()); + let event = LambdaEvent::new(evt, Context::default()); + let sqs_client = get_mock_sqsclient(None).unwrap(); + let s3_client = get_mock_s3client(None).unwrap(); + let ecr_client = get_mock_ecrclient(None).unwrap(); + let clients = AwsClients { + s3: s3_client, + sqs: sqs_client, + ecr: ecr_client, + }; + + coralogix_aws_shipper::logs::handler(&clients, exporter.clone(), &config, event) + .await + .unwrap(); + + let bulks = exporter.take_bulks(); + assert!(bulks.is_empty()); + + let singles = exporter.take_singles(); + assert_eq!(singles.len(), 1); + assert_eq!(singles[0].entries.len(), 2); + let log_lines = vec!["hello world", "goodbye dreams"]; + + for (i, log_line) in log_lines.iter().enumerate() { + assert!( + singles[0].entries[i].body == *log_line, + "log line: {}", + singles[0].entries[i].body + ); + } + + assert!( + singles[0].entries[0].application_name == "integration-testing", + "got application_name: {}", + singles[0].entries[0].application_name + ); + assert!( + singles[0].entries[0].subsystem_name == "lambda", + "got subsystem_name: {}", + singles[0].entries[0].subsystem_name + ); +} + +#[tokio::test] +async fn test_kinesis_with_cloudwatch_event() { + temp_env::async_with_vars( + [ + ("CORALOGIX_API_KEY", Some("1234456789X")), + ("APP_NAME", Some("integration-testing")), + ("CORALOGIX_ENDPOINT", Some("localhost:8080")), + ("SAMPLING", Some("1")), + ("SUB_NAME", Some("lambda")), + ("AWS_REGION", Some("eu-central-1")), + ("INTEGRATION_TYPE", Some("Kinesis")), + ], + run_kinesis_with_cloudwatch_event(), + ) + .await; +} + async fn run_cloudfront_s3_event() { let s3_client = get_mock_s3client(Some("./tests/fixtures/cloudfront.gz")) .expect("failed to create s3 client");