Skip to content

Commit

Permalink
rework intsights to split csv and json (#9855)
Browse files Browse the repository at this point in the history
  • Loading branch information
manuel-sommer authored Apr 8, 2024
1 parent 0a8eb43 commit c7ce764
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 151 deletions.
95 changes: 95 additions & 0 deletions dojo/tools/intsights/csv_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import collections
import csv
import io


class IntSightsCSVParser(object):

Check failure on line 6 in dojo/tools/intsights/csv_handler.py

View workflow job for this annotation

GitHub Actions / ruff-linting

Ruff (UP004)

dojo/tools/intsights/csv_handler.py:6:26: UP004 Class `IntSightsCSVParser` inherits from `object`
def _parse_csv(self, csv_file) -> [dict]:
"""
Parses entries from the CSV file object into a list of alerts
Args:
csv_file: The JSON file object to parse
Returns:
A list of alerts [dict()]
"""
default_keys = [
"Alert ID",
"Title",
"Description",
"Severity",
"Type",
"Source Date (UTC)",
"Report Date (UTC)",
"Network Type",
"Source URL",
"Source Name",
"Assets",
"Tags",
"Assignees",
"Remediation",
"Status",
"Closed Reason",
"Additional Info",
"Rating",
"Alert Link"
]

# These keys require a value. If one ore more of the values is null or empty, the entire Alert is ignored.
# This is to avoid attempting to import incomplete Findings.
required_keys = ["alert_id", "title", "severity", "status"]

alerts = []
invalid_alerts = []

content = csv_file.read()
if isinstance(content, bytes):
content = content.decode("utf-8")
csv_reader = csv.DictReader(
io.StringIO(content), delimiter=",", quotechar='"'
)

# Don't bother parsing if the keys don't match exactly what's expected
if collections.Counter(default_keys) == collections.Counter(
csv_reader.fieldnames
):
default_valud = "None provided"
for alert in csv_reader:
alert["alert_id"] = alert.pop("Alert ID")
alert["title"] = alert.pop("Title")
alert["description"] = alert.pop("Description")
alert["severity"] = alert.pop("Severity")
alert["type"] = alert.pop(
"Type",
)
alert["source_date"] = alert.pop(
"Source Date (UTC)", default_valud
)
alert["report_date"] = alert.pop(
"Report Date (UTC)", default_valud
)
alert["network_type"] = alert.pop(
"Network Type", default_valud
)
alert["source_url"] = alert.pop("Source URL", default_valud)
alert["assets"] = alert.pop("Assets", default_valud)
alert["tags"] = alert.pop("Tags", default_valud)
alert["status"] = alert.pop("Status", default_valud)
alert["alert_link"] = alert.pop("Alert Link")
alert.pop("Assignees")
alert.pop("Remediation")
alert.pop("Closed Reason")
alert.pop("Rating")
for key in required_keys:
if not alert[key]:
invalid_alerts.append(alert)

if alert not in invalid_alerts:
alerts.append(alert)
else:
self._LOGGER.error(
"The CSV file has one or more missing or unexpected header values"
)

return alerts
51 changes: 51 additions & 0 deletions dojo/tools/intsights/json_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import json


class IntSightsJSONParser(object):

Check failure on line 4 in dojo/tools/intsights/json_handler.py

View workflow job for this annotation

GitHub Actions / ruff-linting

Ruff (UP004)

dojo/tools/intsights/json_handler.py:4:27: UP004 Class `IntSightsJSONParser` inherits from `object`
def _parse_json(self, json_file) -> [dict]:
"""
Parses entries from the JSON object into a list of alerts
Args:
json_file: The JSON file object to parse
Returns:
A list of alerts [dict()]
"""
alerts = []

original_alerts = json.load(json_file)
for original_alert in original_alerts.get("Alerts", []):
alert = dict()
alert["alert_id"] = original_alert["_id"]
alert["title"] = original_alert["Details"]["Title"]
alert["description"] = original_alert["Details"]["Description"]
alert["severity"] = original_alert["Details"]["Severity"]
alert["type"] = original_alert["Details"]["Type"]
alert["source_date"] = original_alert["Details"]["Source"].get(
"Date", "None provided"
)
alert["report_date"] = original_alert.get(
"FoundDate", "None provided"
)
alert["network_type"] = original_alert["Details"]["Source"].get(
"NetworkType"
)
alert["source_url"] = original_alert["Details"]["Source"].get(
"URL"
)
alert["assets"] = ",".join(
[item.get("Value") for item in original_alert["Assets"]]
)
alert["tags"] = original_alert["Details"].get("Tags")
alert["status"] = (
"Closed"
if original_alert["Closed"].get("IsClosed")
else "Open"
)
alert["alert_link"] = (
f"https://dashboard.intsights.com/#/threat-command/alerts?search="
f'{original_alert["_id"]}'
)

alerts.append(alert)

return alerts
155 changes: 4 additions & 151 deletions dojo/tools/intsights/parser.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import collections
import csv
import io
import json
import logging

from dojo.models import Finding
from dojo.tools.intsights.csv_handler import IntSightsCSVParser
from dojo.tools.intsights.json_handler import IntSightsJSONParser


class IntSightsParser:
Expand All @@ -23,144 +20,6 @@ def get_label_for_scan_types(self, scan_type):
def get_description_for_scan_types(self, scan_type):
return "IntSights report file can be imported in JSON format."

def _parse_json(self, json_file) -> [dict]:
"""
Parses entries from the JSON object into a list of alerts
Args:
json_file: The JSON file object to parse
Returns:
A list of alerts [dict()]
"""
alerts = []

original_alerts = json.load(json_file)
for original_alert in original_alerts.get("Alerts", []):
alert = dict()
alert["alert_id"] = original_alert["_id"]
alert["title"] = original_alert["Details"]["Title"]
alert["description"] = original_alert["Details"]["Description"]
alert["severity"] = original_alert["Details"]["Severity"]
alert["type"] = original_alert["Details"]["Type"]
alert["source_date"] = original_alert["Details"]["Source"].get(
"Date", "None provided"
)
alert["report_date"] = original_alert.get(
"FoundDate", "None provided"
)
alert["network_type"] = original_alert["Details"]["Source"].get(
"NetworkType"
)
alert["source_url"] = original_alert["Details"]["Source"].get(
"URL"
)
alert["assets"] = ",".join(
[item.get("Value") for item in original_alert["Assets"]]
)
alert["tags"] = original_alert["Details"].get("Tags")
alert["status"] = (
"Closed"
if original_alert["Closed"].get("IsClosed")
else "Open"
)
alert["alert_link"] = (
f"https://dashboard.intsights.com/#/threat-command/alerts?search="
f'{original_alert["_id"]}'
)

alerts.append(alert)

return alerts

def _parse_csv(self, csv_file) -> [dict]:
"""
Parses entries from the CSV file object into a list of alerts
Args:
csv_file: The JSON file object to parse
Returns:
A list of alerts [dict()]
"""
default_keys = [
"Alert ID",
"Title",
"Description",
"Severity",
"Type",
"Source Date (UTC)",
"Report Date (UTC)",
"Network Type",
"Source URL",
"Source Name",
"Assets",
"Tags",
"Assignees",
"Remediation",
"Status",
"Closed Reason",
"Additional Info",
"Rating",
"Alert Link"
]

# These keys require a value. If one ore more of the values is null or empty, the entire Alert is ignored.
# This is to avoid attempting to import incomplete Findings.
required_keys = ["alert_id", "title", "severity", "status"]

alerts = []
invalid_alerts = []

content = csv_file.read()
if isinstance(content, bytes):
content = content.decode("utf-8")
csv_reader = csv.DictReader(
io.StringIO(content), delimiter=",", quotechar='"'
)

# Don't bother parsing if the keys don't match exactly what's expected
if collections.Counter(default_keys) == collections.Counter(
csv_reader.fieldnames
):
default_valud = "None provided"
for alert in csv_reader:
alert["alert_id"] = alert.pop("Alert ID")
alert["title"] = alert.pop("Title")
alert["description"] = alert.pop("Description")
alert["severity"] = alert.pop("Severity")
alert["type"] = alert.pop(
"Type",
)
alert["source_date"] = alert.pop(
"Source Date (UTC)", default_valud
)
alert["report_date"] = alert.pop(
"Report Date (UTC)", default_valud
)
alert["network_type"] = alert.pop(
"Network Type", default_valud
)
alert["source_url"] = alert.pop("Source URL", default_valud)
alert["assets"] = alert.pop("Assets", default_valud)
alert["tags"] = alert.pop("Tags", default_valud)
alert["status"] = alert.pop("Status", default_valud)
alert["alert_link"] = alert.pop("Alert Link")
alert.pop("Assignees")
alert.pop("Remediation")
alert.pop("Closed Reason")
alert.pop("Rating")
for key in required_keys:
if not alert[key]:
invalid_alerts.append(alert)

if alert not in invalid_alerts:
alerts.append(alert)
else:
self._LOGGER.error(
"The CSV file has one or more missing or unexpected header values"
)

return alerts

def _build_finding_description(self, alert: dict) -> str:
"""
Builds an IntSights Finding description from various pieces of information.
Expand All @@ -185,21 +44,18 @@ def _build_finding_description(self, alert: dict) -> str:

def get_findings(self, file, test):
duplicates = dict()

if file.name.lower().endswith(".json"):
alerts = self._parse_json(
alerts = IntSightsJSONParser()._parse_json(
file,
)
elif file.name.lower().endswith(".csv"):
alerts = self._parse_csv(file)
alerts = IntSightsCSVParser()._parse_csv(file)
else:
raise ValueError(
"Filename extension not recognized. Use .json or .csv"
)

for alert in alerts:
dupe_key = alert["alert_id"]

alert = Finding(
title=alert["title"],
test=test,
Expand All @@ -212,10 +68,7 @@ def get_findings(self, file, test):
dynamic_finding=True,
unique_id_from_tool=alert["alert_id"]
)

duplicates[dupe_key] = alert

if dupe_key not in duplicates:
duplicates[dupe_key] = True

return duplicates.values()

0 comments on commit c7ce764

Please sign in to comment.