Skip to content

Commit

Permalink
Refactor awssecurityhub and add endpoint (#9814)
Browse files Browse the repository at this point in the history
* rework_awssecurityhub

* add endpoints for inspector

* add endpoints for guardduty

* 💄
  • Loading branch information
manuel-sommer authored Mar 28, 2024
1 parent 32398f6 commit 97a8a56
Show file tree
Hide file tree
Showing 5 changed files with 266 additions and 142 deletions.
72 changes: 72 additions & 0 deletions dojo/tools/awssecurityhub/compliance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from datetime import datetime
from dojo.models import Finding


class Compliance(object):
def get_item(self, finding: dict, test):
finding_id = finding.get("Id", "")
title = finding.get("Title", "")
severity = finding.get("Severity", {}).get("Label", "INFORMATIONAL").title()
mitigation = ""
impact = []
references = []
unsaved_vulnerability_ids = []
epss_score = None
mitigation = finding.get("Remediation", {}).get("Recommendation", {}).get("Text", "")
description = "This is a Security Hub Finding \n" + finding.get("Description", "")
if finding.get("Compliance", {}).get("Status", "PASSED") == "PASSED":
is_Mitigated = True
active = False
if finding.get("LastObservedAt", None):
try:
mitigated = datetime.strptime(finding.get("LastObservedAt"), "%Y-%m-%dT%H:%M:%S.%fZ")
except Exception:
mitigated = datetime.strptime(finding.get("LastObservedAt"), "%Y-%m-%dT%H:%M:%fZ")
else:
mitigated = datetime.utcnow()
else:
mitigated = None
is_Mitigated = False
active = True
title_suffix = ""
for resource in finding.get("Resources", []):
component_name = resource.get("Type")
if component_name == "AwsEcrContainerImage":
details = resource.get("Details", {}).get("AwsEcrContainerImage")
arn = resource.get("Id")
if details:
impact.append(f"Image ARN: {arn}")
impact.append(f"Registry: {details.get('RegistryId')}")
impact.append(f"Repository: {details.get('RepositoryName')}")
impact.append(f"Image digest: {details.get('ImageDigest')}")
title_suffix = f" - Image: {arn.split('/', 1)[1]}" # repo-name/sha256:digest
else: # generic implementation
resource_id = resource["Id"].split(":")[-1]
impact.append(f"Resource: {resource_id}")
title_suffix = f" - Resource: {resource_id}"
if remediation_rec_url := finding.get("Remediation", {}).get("Recommendation", {}).get("Url"):
references.append(remediation_rec_url)
false_p = False
result = Finding(
title=f"{title}{title_suffix}",
test=test,
description=description,
mitigation=mitigation,
references="\n".join(references),
severity=severity,
impact="\n".join(impact),
active=active,
verified=False,
false_p=false_p,
unique_id_from_tool=finding_id,
mitigated=mitigated,
is_mitigated=is_Mitigated,
static_finding=True,
dynamic_finding=False,
component_name=component_name,
)
if epss_score is not None:
result.epss_score = epss_score
# Add the unsaved vulnerability ids
result.unsaved_vulnerability_ids = unsaved_vulnerability_ids
return result
82 changes: 82 additions & 0 deletions dojo/tools/awssecurityhub/guardduty.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
from datetime import datetime
from dojo.models import Finding, Endpoint


class GuardDuty(object):
def get_item(self, finding: dict, test):
finding_id = finding.get("Id", "")
title = finding.get("Title", "")
severity = finding.get("Severity", {}).get("Label", "INFORMATIONAL").title()
mitigation = ""
impact = []
references = []
unsaved_vulnerability_ids = []
epss_score = None
mitigations = finding.get("FindingProviderFields", {}).get("Types")
for mitigate in mitigations:
mitigation += mitigate + "\n"
mitigation += "https://docs.aws.amazon.com/guardduty/latest/ug/guardduty_finding-types-active.html"
active = True
if finding.get("RecordState") == "ACTIVE":
is_Mitigated = False
mitigated = None
else:
is_Mitigated = True
if finding.get("LastObservedAt", None):
try:
mitigated = datetime.strptime(finding.get("LastObservedAt"), "%Y-%m-%dT%H:%M:%S.%fZ")
except Exception:
mitigated = datetime.strptime(finding.get("LastObservedAt"), "%Y-%m-%dT%H:%M:%fZ")
else:
mitigated = datetime.utcnow()
description = f"This is a GuardDuty Finding\n{finding.get('Description', '')}"
description += f"SourceURL: {finding.get('SourceUrl', '')}\n"
description += f"AwsAccountId: {finding.get('AwsAccountId', '')}\n"
description += f"Region: {finding.get('Region', '')}\n"
title_suffix = ""
hosts = list()
for resource in finding.get("Resources", []):
component_name = resource.get("Type")
if component_name in ("AwsEcrContainerImage", "AwsEc2Instance"):
hosts.append(Endpoint(host=f"{component_name} {resource.get('Id')}"))
if component_name == "AwsEcrContainerImage":
details = resource.get("Details", {}).get("AwsEcrContainerImage")
arn = resource.get("Id")
if details:
impact.append(f"Image ARN: {arn}")
impact.append(f"Registry: {details.get('RegistryId')}")
impact.append(f"Repository: {details.get('RepositoryName')}")
impact.append(f"Image digest: {details.get('ImageDigest')}")
title_suffix = f" - Image: {arn.split('/', 1)[1]}" # repo-name/sha256:digest
else: # generic implementation
resource_id = resource["Id"].split(":")[-1]
impact.append(f"Resource: {resource_id}")
title_suffix = f" - Resource: {resource_id}"
if remediation_rec_url := finding.get("Remediation", {}).get("Recommendation", {}).get("Url"):
references.append(remediation_rec_url)
false_p = False
result = Finding(
title=f"{title}{title_suffix}",
test=test,
description=description,
mitigation=mitigation,
references="\n".join(references),
severity=severity,
impact="\n".join(impact),
active=active,
verified=False,
false_p=false_p,
unique_id_from_tool=finding_id,
mitigated=mitigated,
is_mitigated=is_Mitigated,
static_finding=True,
dynamic_finding=False,
component_name=component_name,
)
result.unsaved_endpoints = list()
result.unsaved_endpoints.extend(hosts)
if epss_score is not None:
result.epss_score = epss_score
# Add the unsaved vulnerability ids
result.unsaved_vulnerability_ids = unsaved_vulnerability_ids
return result
94 changes: 94 additions & 0 deletions dojo/tools/awssecurityhub/inspector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
from datetime import datetime
from dojo.models import Finding, Endpoint


class Inspector(object):
def get_item(self, finding: dict, test):
finding_id = finding.get("Id", "")
title = finding.get("Title", "")
severity = finding.get("Severity", {}).get("Label", "INFORMATIONAL").title()
mitigation = ""
impact = []
references = []
unsaved_vulnerability_ids = []
epss_score = None
description = f"This is an Inspector Finding\n{finding.get('Description', '')}"
vulnerabilities = finding.get("Vulnerabilities", [])
for vulnerability in vulnerabilities:
# Save the CVE if it is present
if cve := vulnerability.get("Id"):
unsaved_vulnerability_ids.append(cve)
for alias in vulnerability.get("RelatedVulnerabilities", []):
if alias != cve:
unsaved_vulnerability_ids.append(alias)
# Add information about the vulnerable packages to the description and mitigation
vulnerable_packages = vulnerability.get("VulnerablePackages", [])
for package in vulnerable_packages:
mitigation += f"- Update {package.get('Name', '')}-{package.get('Version', '')}\n"
if remediation := package.get("Remediation"):
mitigation += f"\t- {remediation}\n"
if vendor := vulnerability.get("Vendor"):
if vendor_url := vendor.get("Url"):
references.append(vendor_url)
if vulnerability.get("EpssScore") is not None:
epss_score = vulnerability.get("EpssScore")
if finding.get("ProductFields", {}).get("aws/inspector/FindingStatus", "ACTIVE") == "ACTIVE":
mitigated = None
is_Mitigated = False
active = True
else:
is_Mitigated = True
active = False
if finding.get("LastObservedAt", None):
try:
mitigated = datetime.strptime(finding.get("LastObservedAt"), "%Y-%m-%dT%H:%M:%S.%fZ")
except Exception:
mitigated = datetime.strptime(finding.get("LastObservedAt"), "%Y-%m-%dT%H:%M:%fZ")
else:
mitigated = datetime.utcnow()
title_suffix = ""
hosts = list()
for resource in finding.get("Resources", []):
component_name = resource.get("Type")
hosts.append(Endpoint(host=f"{component_name} {resource.get('Id')}"))
if component_name == "AwsEcrContainerImage":
details = resource.get("Details", {}).get("AwsEcrContainerImage")
arn = resource.get("Id")
if details:
impact.append(f"Image ARN: {arn}")
impact.append(f"Registry: {details.get('RegistryId')}")
impact.append(f"Repository: {details.get('RepositoryName')}")
impact.append(f"Image digest: {details.get('ImageDigest')}")
title_suffix = f" - Image: {arn.split('/', 1)[1]}" # repo-name/sha256:digest
else: # generic implementation
resource_id = resource["Id"].split(":")[-1]
impact.append(f"Resource: {resource_id}")
title_suffix = f" - Resource: {resource_id}"
if remediation_rec_url := finding.get("Remediation", {}).get("Recommendation", {}).get("Url"):
references.append(remediation_rec_url)
false_p = False
result = Finding(
title=f"{title}{title_suffix}",
test=test,
description=description,
mitigation=mitigation,
references="\n".join(references),
severity=severity,
impact="\n".join(impact),
active=active,
verified=False,
false_p=false_p,
unique_id_from_tool=finding_id,
mitigated=mitigated,
is_mitigated=is_Mitigated,
static_finding=True,
dynamic_finding=False,
component_name=component_name,
)
result.unsaved_endpoints = list()
result.unsaved_endpoints.extend(hosts)
if epss_score is not None:
result.epss_score = epss_score
# Add the unsaved vulnerability ids
result.unsaved_vulnerability_ids = unsaved_vulnerability_ids
return result
Loading

0 comments on commit 97a8a56

Please sign in to comment.