-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
fb93455
commit 4ad0d26
Showing
2 changed files
with
238 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,191 @@ | ||
import json | ||
import textwrap | ||
from datetime import datetime | ||
|
||
from dojo.models import Finding | ||
from html2text import html2text | ||
|
||
|
||
class AWSScout2Parser: | ||
# FIXME bad very bad | ||
item_data = "" | ||
pdepth = 0 | ||
|
||
def get_scan_types(self): | ||
return ["AWS Scout2 Scan"] | ||
|
||
def get_label_for_scan_types(self, scan_type): | ||
return "AWS Scout2 Scan" | ||
|
||
def get_description_for_scan_types(self, scan_type): | ||
return "JS file in scout2-report/inc-awsconfig/aws_config.js." | ||
|
||
def get_findings(self, filename, test): | ||
content = filename.read() | ||
if isinstance(content, bytes): | ||
content = content.decode("utf-8") | ||
raw_data = content.replace("aws_info =", "") | ||
data = json.loads(raw_data) | ||
find_date = datetime.now() | ||
dupes = {} | ||
|
||
test_description = "" | ||
aws_account_id = data["aws_account_id"] | ||
test_description = f"{test_description} **AWS Account:** {aws_account_id}\n" | ||
last_run = data["last_run"] | ||
test_description = "{} **Ruleset:** {}\n".format( | ||
test_description, | ||
last_run["ruleset_name"], | ||
) | ||
test_description = "{} **Ruleset Description:** {}\n".format( | ||
test_description, | ||
last_run["ruleset_about"], | ||
) | ||
test_description = "{} **Command:** {}\n".format( | ||
test_description, | ||
last_run["cmd"], | ||
) | ||
|
||
# Summary for AWS Services | ||
test_description = "%s\n**AWS Services** \n\n" % (test_description) | ||
for service, items in list(last_run["summary"].items()): | ||
test_description = f"{test_description}\n**{service.upper()}** \n" | ||
test_description = "{}\n* **Checked Items:** {}\n".format( | ||
test_description, | ||
items["checked_items"], | ||
) | ||
test_description = "{}* **Flagged Items:** {}\n".format( | ||
test_description, | ||
items["flagged_items"], | ||
) | ||
test_description = "{}* **Max Level:** {}\n".format( | ||
test_description, | ||
items["max_level"], | ||
) | ||
test_description = "{}* **Resource Count:** {}\n".format( | ||
test_description, | ||
items["resources_count"], | ||
) | ||
test_description = "{}* **Rules Count:** {}\n\n".format( | ||
test_description, | ||
items["rules_count"], | ||
) | ||
test.description = test_description | ||
test.save() | ||
|
||
scout2_findings = [] | ||
|
||
# Configured AWS Services | ||
for service in list(data["services"].items()): | ||
for service_item in service: | ||
if "findings" in service_item: | ||
for name, finding in list( | ||
service_item["findings"].items() | ||
): | ||
if finding["items"]: | ||
description_text = "" | ||
for name in finding["items"]: | ||
description_text = ( | ||
description_text | ||
+ "**Location:** " | ||
+ name | ||
+ "\n\n---\n" | ||
) | ||
description_text = description_text + "\n" | ||
key = name.split(".") | ||
i = 1 | ||
lookup = service_item | ||
while i < len(key): | ||
if key[i] in lookup: | ||
if isinstance(lookup[key[i]], dict): | ||
lookup = lookup[key[i]] | ||
if ( | ||
key[i - 1] == "security_groups" | ||
or key[i - 1] | ||
== "PolicyDocument" | ||
): | ||
break | ||
i = i + 1 | ||
|
||
self.recursive_print(lookup) | ||
description_text = ( | ||
description_text + self.item_data | ||
) | ||
self.item_data = "" | ||
|
||
mobsf_item = { | ||
"category": "Mobile Permissions", | ||
"title": finding["description"], | ||
"severity": finding["level"], | ||
"description": description_text, | ||
} | ||
scout2_findings.append(mobsf_item) | ||
|
||
for scout2_finding in scout2_findings: | ||
title = html2text(scout2_finding["title"]) | ||
sev = self.getCriticalityRating(scout2_finding["severity"]) | ||
description = scout2_finding["description"] | ||
dupe_key = sev + title | ||
if dupe_key in dupes: | ||
find = dupes[dupe_key] | ||
if description is not None: | ||
find.description += description | ||
else: | ||
find = Finding( | ||
title=textwrap.shorten(title, 150), | ||
cwe=1032, # Security Configuration Weaknesses, would like to fine tune | ||
test=test, | ||
description="**AWS Account:** " | ||
+ aws_account_id | ||
+ "\n" | ||
+ description, | ||
severity=sev, | ||
references=None, | ||
date=find_date, | ||
dynamic_finding=True, | ||
) | ||
dupes[dupe_key] = find | ||
return list(dupes.values()) | ||
|
||
def formatview(self, depth): | ||
if depth > 1: | ||
return "* " | ||
else: | ||
return "" | ||
|
||
def recursive_print(self, src, depth=0, key=""): | ||
def tabs(n): | ||
return " " * n * 2 | ||
|
||
if isinstance(src, dict): | ||
for key, value in src.items(): | ||
if isinstance(src, str): | ||
self.item_data = self.item_data + key + "\n" | ||
self.recursive_print(value, depth + 1, key) | ||
elif isinstance(src, list): | ||
for litem in src: | ||
self.recursive_print(litem, depth + 2) | ||
else: | ||
if self.pdepth != depth: | ||
self.item_data = self.item_data + "\n" | ||
if key: | ||
self.item_data = ( | ||
self.item_data | ||
+ self.formatview(depth) | ||
+ f"**{key.title()}:** {src}\n\n" | ||
) | ||
else: | ||
self.item_data = ( | ||
self.item_data + self.formatview(depth) + "%s\n" % src | ||
) | ||
self.pdepth = depth | ||
|
||
# Criticality rating | ||
def getCriticalityRating(self, rating): | ||
criticality = "Info" | ||
if rating == "warning": | ||
criticality = "Medium" | ||
elif rating == "danger": | ||
criticality = "Critical" | ||
|
||
return criticality |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
from ..dojo_test_case import DojoTestCase | ||
from dojo.tools.aws_scout2.parser import AWSScout2Parser | ||
from django.utils import timezone | ||
from dojo.models import Test, Engagement, Product, Product_Type, Test_Type | ||
|
||
|
||
class TestAwsProwlerParser(DojoTestCase): | ||
def setup(self, testfile): | ||
product_type = Product_Type(critical_product=True, key_product=False) | ||
product_type.save() | ||
|
||
test_type = Test_Type(static_tool=True, dynamic_tool=False) | ||
test_type.save() | ||
|
||
product = Product(prod_type=product_type) | ||
product.save() | ||
|
||
engagement = Engagement( | ||
product=product, target_start=timezone.now(), target_end=timezone.now() | ||
) | ||
engagement.save() | ||
|
||
parser = AWSScout2Parser() | ||
findings = parser.get_findings( | ||
testfile, | ||
Test( | ||
engagement=engagement, | ||
test_type=test_type, | ||
target_start=timezone.now(), | ||
target_end=timezone.now(), | ||
), | ||
) | ||
|
||
testfile.close() | ||
|
||
return findings | ||
|
||
def test_parser_with_critical_vuln_has_one_findings(self): | ||
findings = self.setup(open("unittests/scans/aws_scout2/aws_config.js")) | ||
self.assertEqual(21, len(findings)) | ||
self.assertEqual("Global services logging disabled", findings[0].title) | ||
self.assertEqual("Critical", findings[0].severity) | ||
self.assertEqual(1032, findings[0].cwe) | ||
self.assertEqual("Unused security groups", findings[6].title) | ||
self.assertEqual("Medium", findings[6].severity) | ||
self.assertEqual(1032, findings[6].cwe) | ||
|