Skip to content

Commit

Permalink
feat gsn-10917: comply interface terminology to security hub
Browse files Browse the repository at this point in the history
  • Loading branch information
carlovoSBP committed Jul 10, 2024
1 parent 671f3f6 commit 230bc0d
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 37 deletions.
44 changes: 22 additions & 22 deletions awsfindingsmanagerlib/awsfindingsmanagerlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,12 +326,12 @@ def is_matching_rule(self, rule: Rule) -> bool:
"""Checks a rule for a match with the finding.
If any of control_id, security_control_id or rule_id attributes match between the rule and the finding and the
rule does not have any filtering attributes like resource_ids or tags then it is considered a match. (Big blast
radius) only matching on the control.
rule does not have any filtering attributes like resource_id_regex_list or tags then it is considered a match.
(Big blast radius) only matching on the control.
If the rule has any attributes like resource_ids or tags then a secondary match is searched for any of them with
the corresponding finding attributes. If any match is found then the rule is found matching if none are matching
then the rule is not considered a matching rule.
If the rule has any attributes like resource_id_regex_list or tags then a secondary match is searched for any of
them with the corresponding finding attributes. If any match is found then the rule is found matching if none
are matching then the rule is not considered a matching rule.
Args:
rule: The rule object to match with.
Expand All @@ -345,14 +345,14 @@ def is_matching_rule(self, rule: Rule) -> bool:
"""
if not isinstance(rule, Rule):
raise InvalidRuleType(rule)
if any([self.control_id == rule.control_id,
self.rule_id == rule.control_id,
if any([self.control_id == rule.rule_or_control_id,
self.rule_id == rule.rule_or_control_id,
self.security_control_id == rule.security_control_id]):
self._logger.debug(f'Matched with rule "{rule.note}" on one of "control_id, security_control_id"')
if not any([rule.resource_ids, rule.resource_ids]):
if not any([rule.tags, rule.resource_id_regex_list]):
self._logger.debug(f'Rule "{rule.note}" does not seem to have filters for resources or tags.')
return True
if any([self.is_matching_tags(rule.tags), self.is_matching_resource_ids(rule.resource_ids)]):
if any([self.is_matching_tags(rule.tags), self.is_matching_resource_ids(rule.resource_id_regex_list)]):
self._logger.debug(f'Matched with rule "{rule.note}" either on resources or tags.')
return True
return False
Expand Down Expand Up @@ -402,22 +402,22 @@ def security_control_id(self) -> str:
return self.match_on.get('security_control_id', '')

@property
def control_id(self) -> str:
def rule_or_control_id(self) -> str:
"""The control ID if any, empty string otherwise."""
return self.match_on.get('control_id', '')
return self.match_on.get('rule_or_control_id', '')

@property
def resource_ids(self) -> List[Optional[str]]:
def resource_id_regex_list(self) -> List[Optional[str]]:
"""The resource ids specified under the match_on attribute."""
return self.match_on.get('resource_ids', [])
return self.match_on.get('resource_id_regex_list', [])

@property
def tags(self) -> List[Optional[str]]:
"""The tags specified under the match_on attribute."""
return self.match_on.get('tags', [])

@staticmethod
def _get_control_id_query(match_on_data) -> Dict:
def _get_rule_or_control_id_query(match_on_data) -> Dict:
"""Constructs a valid query based on a set control ID if any.
Args:
Expand All @@ -427,16 +427,16 @@ def _get_control_id_query(match_on_data) -> Dict:
The query matching the set control ID, empty dictionary otherwise.
"""
control_id = match_on_data.get('control_id')
if not control_id:
rule_or_control_id = match_on_data.get('rule_or_control_id')
if not rule_or_control_id:
return {}
# For the CIS AWS Foundations Benchmark standard, the field is RuleId
# for other standards the field is ControlId, so we use both.
return {'ProductFields': [{'Key': 'ControlId',
'Value': control_id,
'Value': rule_or_control_id,
'Comparison': 'EQUALS'},
{'Key': 'RuleId',
'Value': control_id,
'Value': rule_or_control_id,
'Comparison': 'EQUALS'}]}

@staticmethod
Expand Down Expand Up @@ -484,7 +484,7 @@ def query_filter(self) -> Dict:
"""
query = deepcopy(DEFAULT_SECURITY_HUB_FILTER)
query.update(self._get_control_id_query(self.match_on))
query.update(self._get_rule_or_control_id_query(self.match_on))
query.update(self._get_security_control_id_query(self.match_on))
query.update(self._get_tag_query(self.match_on))
return deepcopy(query)
Expand Down Expand Up @@ -733,9 +733,9 @@ def _get_findings(self, query_filter: Dict):

@staticmethod
def _get_matching_findings(rule: Rule, findings: List[Finding], logger: logging.Logger) -> List[Finding]:
if any([rule.resource_ids, rule.tags]):
if any([rule.resource_id_regex_list, rule.tags]):
matching_findings = [finding for finding in findings
if any([finding.is_matching_resource_ids(rule.resource_ids),
if any([finding.is_matching_resource_ids(rule.resource_id_regex_list),
finding.is_matching_tags(rule.tags)])]
logger.debug(f'Following findings matched with rule with note: "{rule.note}", '
f'{[finding.id for finding in matching_findings]}')
Expand Down Expand Up @@ -1015,7 +1015,7 @@ def suppress_findings_on_matching_rules(self, finding_data: Union[List[Dict], Di
self._logger.error(f'Data {payload} seems to be invalid.')
return self._workflow_state_change_on_findings([finding for finding in findings if finding])

def get_unmanaged_suppressed_findings(self) -> [Finding]:
def get_unmanaged_suppressed_findings(self) -> List[Finding]:
"""Retrieves a list of suppressed findings that are not managed by this library.
Returns:
Expand Down
6 changes: 3 additions & 3 deletions awsfindingsmanagerlib/validations.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,16 +47,16 @@
__email__ = '''<[email protected]>,<[email protected]>,<[email protected]>'''
__status__ = '''Development''' # "Prototype", "Development", "Production".

rule_schema = Schema({'match_on': {Optional('control_id'): str,
rule_schema = Schema({'match_on': {Optional('rule_or_control_id'): str,
Optional('security_control_id'): str,
Optional('resource_ids'): [str],
Optional('resource_id_regex_list'): [str],
Optional('tags'): [{'key': str,
'value': str}]},
'note': str,
'action': lambda x: x in ('SUPPRESSED',)})

RULE_SUPPORTED_ACTIONS = ('SUPPRESSED',)
RULE_MUTUALLY_EXCLUSIVE = [('security_control_id', 'control_id')]
RULE_MUTUALLY_EXCLUSIVE = [('security_control_id', 'rule_or_control_id')]


def validate_rule_data(rule_data) -> Dict:
Expand Down
2 changes: 1 addition & 1 deletion tests/fixtures/batch_update_findings.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,4 @@
"Text": "Maybe later",
"UpdatedBy": "FindingsManager"
}
}
}
2 changes: 1 addition & 1 deletion tests/fixtures/findings.json
Original file line number Diff line number Diff line change
Expand Up @@ -187,4 +187,4 @@
"AwsAccountName": "account2"
}
]
}
}
20 changes: 10 additions & 10 deletions tests/fixtures/suppressions.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# Comments
Rules:
- note: 'Maybe later'
action: 'SUPPRESSED'
match_on:
security_control_id: 'GuardDuty.1'
- note: 'Public'
action: 'SUPPRESSED'
match_on:
resource_ids:
- '^arn:aws:s3:::public-bucket$'
- '^arn:aws:s3:::bucket-public$'
- note: 'Maybe later'
action: 'SUPPRESSED'
match_on:
security_control_id: 'GuardDuty.1'
- note: 'Public'
action: 'SUPPRESSED'
match_on:
resource_id_regex_list:
- '^arn:aws:s3:::public-bucket$'
- '^arn:aws:s3:::bucket-public$'

0 comments on commit 230bc0d

Please sign in to comment.