From 71e858c89c030e1235c8887ace2fcce7608c2f36 Mon Sep 17 00:00:00 2001 From: DefectDojo release bot Date: Mon, 16 Dec 2024 16:04:37 +0000 Subject: [PATCH 1/6] Update versions in application files --- components/package.json | 2 +- helm/defectdojo/Chart.yaml | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/components/package.json b/components/package.json index 590f1cb37e0..febe451775d 100644 --- a/components/package.json +++ b/components/package.json @@ -1,6 +1,6 @@ { "name": "defectdojo", - "version": "2.41.2", + "version": "2.42.0-dev", "license" : "BSD-3-Clause", "private": true, "dependencies": { diff --git a/helm/defectdojo/Chart.yaml b/helm/defectdojo/Chart.yaml index f76daab65f6..ab66f338320 100644 --- a/helm/defectdojo/Chart.yaml +++ b/helm/defectdojo/Chart.yaml @@ -1,8 +1,8 @@ apiVersion: v2 -appVersion: "2.41.2" +appVersion: "2.42.0-dev" description: A Helm chart for Kubernetes to install DefectDojo name: defectdojo -version: 1.6.164 +version: 1.6.165-dev icon: https://www.defectdojo.org/img/favicon.ico maintainers: - name: madchap From 2f79dc6f90c3eeac50f71f022604ccf20b1fddcb Mon Sep 17 00:00:00 2001 From: Cody Maffucci <46459665+Maffooch@users.noreply.github.com> Date: Tue, 17 Dec 2024 22:07:24 -0600 Subject: [PATCH 2/6] Notifications: Convert to classes (#11296) * Struggle bussing * Getting tests sorted out * Some tweaks * Formatting * Update mocks * Correct ruff * Update dojo/notifications/helper.py Co-authored-by: Charles Neill <1749665+cneill@users.noreply.github.com> * Update dojo/notifications/helper.py Co-authored-by: Charles Neill <1749665+cneill@users.noreply.github.com> * Update dojo/notifications/helper.py Co-authored-by: Charles Neill <1749665+cneill@users.noreply.github.com> * Update dojo/notifications/helper.py Co-authored-by: Charles Neill <1749665+cneill@users.noreply.github.com> * Make `no_users` default to False in more than one place * Last ruff fix --------- Co-authored-by: Charles Neill <1749665+cneill@users.noreply.github.com> --- dojo/engagement/views.py | 11 +- dojo/importers/base_importer.py | 37 + dojo/importers/default_importer.py | 15 +- dojo/importers/default_reimporter.py | 3 +- dojo/notifications/helper.py | 1287 ++++++++++++++++---------- dojo/notifications/views.py | 9 +- unittests/test_notifications.py | 145 +-- 7 files changed, 950 insertions(+), 557 deletions(-) diff --git a/dojo/engagement/views.py b/dojo/engagement/views.py index 70ff8a7b160..3b515c7468a 100644 --- a/dojo/engagement/views.py +++ b/dojo/engagement/views.py @@ -28,7 +28,6 @@ from openpyxl.styles import Font import dojo.jira_link.helper as jira_helper -import dojo.notifications.helper as notifications_helper import dojo.risk_acceptance.helper as ra_helper from dojo.authorization.authorization import user_has_permission_or_403 from dojo.authorization.authorization_decorators import user_is_authorized @@ -653,7 +652,15 @@ def add_tests(request, eid): "Test added successfully.", extra_tags="alert-success") - notifications_helper.notify_test_created(new_test) + create_notification( + event="test_added", + title=f"Test created for {new_test.engagement.product}: {new_test.engagement.name}: {new_test}", + test=new_test, + engagement=new_test.engagement, + product=new_test.engagement.product, + url=reverse("view_test", args=(new_test.id,)), + url_api=reverse("test-detail", args=(new_test.id,)), + ) if "_Add Another Test" in request.POST: return HttpResponseRedirect( diff --git a/dojo/importers/base_importer.py b/dojo/importers/base_importer.py index c9a77fbb95b..cab58fd718b 100644 --- a/dojo/importers/base_importer.py +++ b/dojo/importers/base_importer.py @@ -5,6 +5,7 @@ from django.core.exceptions import ValidationError from django.core.files.base import ContentFile from django.core.files.uploadedfile import TemporaryUploadedFile +from django.urls import reverse from django.utils.timezone import make_aware import dojo.finding.helper as finding_helper @@ -28,6 +29,7 @@ Test_Type, Vulnerability_Id, ) +from dojo.notifications.helper import create_notification from dojo.tools.factory import get_parser from dojo.utils import max_safe @@ -719,3 +721,38 @@ def mitigate_finding( finding.save(dedupe_option=False) else: finding.save(dedupe_option=False, push_to_jira=self.push_to_jira) + + def notify_scan_added( + self, + test, + updated_count, + new_findings=[], + findings_mitigated=[], + findings_reactivated=[], + findings_untouched=[], + ): + logger.debug("Scan added notifications") + + new_findings = sorted(new_findings, key=lambda x: x.numerical_severity) + findings_mitigated = sorted(findings_mitigated, key=lambda x: x.numerical_severity) + findings_reactivated = sorted(findings_reactivated, key=lambda x: x.numerical_severity) + findings_untouched = sorted(findings_untouched, key=lambda x: x.numerical_severity) + + title = ( + f"Created/Updated {updated_count} findings for {test.engagement.product}: {test.engagement.name}: {test}" + ) + + create_notification( + event="scan_added_empty" if updated_count == 0 else "scan_added", + title=title, + findings_new=new_findings, + findings_mitigated=findings_mitigated, + findings_reactivated=findings_reactivated, + finding_count=updated_count, + test=test, + engagement=test.engagement, + product=test.engagement.product, + findings_untouched=findings_untouched, + url=reverse("view_test", args=(test.id,)), + url_api=reverse("test-detail", args=(test.id,)), + ) diff --git a/dojo/importers/default_importer.py b/dojo/importers/default_importer.py index 95254ef59b8..3ac31143792 100644 --- a/dojo/importers/default_importer.py +++ b/dojo/importers/default_importer.py @@ -3,10 +3,10 @@ from django.core.files.uploadedfile import TemporaryUploadedFile from django.core.serializers import deserialize, serialize from django.db.models.query_utils import Q +from django.urls import reverse import dojo.finding.helper as finding_helper import dojo.jira_link.helper as jira_helper -import dojo.notifications.helper as notifications_helper from dojo.importers.base_importer import BaseImporter, Parser from dojo.importers.options import ImporterOptions from dojo.models import ( @@ -15,6 +15,7 @@ Test, Test_Import, ) +from dojo.notifications.helper import create_notification logger = logging.getLogger(__name__) deduplicationLogger = logging.getLogger("dojo.specific-loggers.deduplication") @@ -126,9 +127,17 @@ def process_scan( ) # Send out some notifications to the user logger.debug("IMPORT_SCAN: Generating notifications") - notifications_helper.notify_test_created(self.test) + create_notification( + event="test_added", + title=f"Test created for {self.test.engagement.product}: {self.test.engagement.name}: {self.test}", + test=self.test, + engagement=self.test.engagement, + product=self.test.engagement.product, + url=reverse("view_test", args=(self.test.id,)), + url_api=reverse("test-detail", args=(self.test.id,)), + ) updated_count = len(new_findings) + len(closed_findings) - notifications_helper.notify_scan_added( + self.notify_scan_added( self.test, updated_count, new_findings=new_findings, diff --git a/dojo/importers/default_reimporter.py b/dojo/importers/default_reimporter.py index 9debf4aabaa..0c4159ed669 100644 --- a/dojo/importers/default_reimporter.py +++ b/dojo/importers/default_reimporter.py @@ -6,7 +6,6 @@ import dojo.finding.helper as finding_helper import dojo.jira_link.helper as jira_helper -import dojo.notifications.helper as notifications_helper from dojo.importers.base_importer import BaseImporter, Parser from dojo.importers.options import ImporterOptions from dojo.models import ( @@ -128,7 +127,7 @@ def process_scan( updated_count = ( len(closed_findings) + len(reactivated_findings) + len(new_findings) ) - notifications_helper.notify_scan_added( + self.notify_scan_added( self.test, updated_count, new_findings=new_findings, diff --git a/dojo/notifications/helper.py b/dojo/notifications/helper.py index 55281901192..3e0a0295de2 100644 --- a/dojo/notifications/helper.py +++ b/dojo/notifications/helper.py @@ -1,5 +1,7 @@ +import importlib import json import logging +from contextlib import suppress from datetime import timedelta import requests @@ -7,7 +9,7 @@ from django.conf import settings from django.core.exceptions import FieldDoesNotExist from django.core.mail import EmailMessage -from django.db.models import Count, Prefetch, Q +from django.db.models import Count, Prefetch, Q, QuerySet from django.template import TemplateDoesNotExist from django.template.loader import render_to_string from django.urls import reverse @@ -20,213 +22,282 @@ from dojo.models import ( Alerts, Dojo_User, + Engagement, + Finding, Notification_Webhooks, Notifications, + Product, + Product_Type, System_Settings, + Test, UserContactInfo, get_current_datetime, ) -from dojo.user.queries import get_authorized_users_for_product_and_product_type, get_authorized_users_for_product_type +from dojo.user.queries import ( + get_authorized_users_for_product_and_product_type, + get_authorized_users_for_product_type, +) logger = logging.getLogger(__name__) -def create_notification(event=None, **kwargs): - system_settings = System_Settings.objects.get() - kwargs["system_settings"] = system_settings - # System notifications - try: - system_notifications = Notifications.objects.get(user=None, template=False) - except Exception: - system_notifications = Notifications() - - if "recipients" in kwargs: - # mimic existing code so that when recipients is specified, no other system or personal notifications are sent. - logger.debug("creating notifications for recipients: %s", kwargs["recipients"]) - for recipient_notifications in Notifications.objects.filter(user__username__in=kwargs["recipients"], user__is_active=True, product=None): - if event in settings.NOTIFICATIONS_SYSTEM_LEVEL_TRUMP: - # merge the system level notifications with the personal level - # this allows for system to trump the personal - merged_notifications = Notifications.merge_notifications_list([system_notifications, recipient_notifications]) - merged_notifications.user = recipient_notifications.user - logger.debug("Sent notification to %s", merged_notifications.user) - process_notifications(event, merged_notifications, **kwargs) - else: - # Do not trump user preferences and send notifications as usual - logger.debug("Sent notification to %s", recipient_notifications.user) - process_notifications(event, recipient_notifications, **kwargs) - - else: - logger.debug("creating system notifications for event: %s", event) - # send system notifications to all admin users - - # parse kwargs before converting them to dicts - product_type = None - if "product_type" in kwargs: - product_type = kwargs.get("product_type") - logger.debug("Defined product type %s", product_type) +def create_notification( + event: str | None = None, + title: str | None = None, + finding: Finding | None = None, + test: Test | None = None, + engagement: Engagement | None = None, + product: Product | None = None, + requested_by: Dojo_User | None = None, + reviewers: list[Dojo_User] | list[str] | None = None, + recipients: list[Dojo_User] | list[str] | None = None, + no_users: bool = False, # noqa: FBT001 + url: str | None = None, + url_api: str | None = None, + **kwargs: dict, +) -> None: + """Create an instance of a NotificationManager and dispatch the notification.""" + default_manager = NotificationManager + notification_manager_class = default_manager + if isinstance( + ( + notification_manager := getattr( + settings, + "NOTIFICATION_MANAGER", + default_manager, + ) + ), + str, + ): + with suppress(ModuleNotFoundError): + module_name, _separator, class_name = notification_manager.rpartition(".") + module = importlib.import_module(module_name) + notification_manager_class = getattr(module, class_name) + notification_manager_class().create_notification( + event=event, + title=title, + finding=finding, + test=test, + engagement=engagement, + product=product, + requested_by=requested_by, + reviewers=reviewers, + recipients=recipients, + no_users=no_users, + url=url, + url_api=url_api, + **kwargs, + ) - product = None - if "product" in kwargs: - product = kwargs.get("product") - logger.debug("Defined product %s", product) - elif "engagement" in kwargs: - product = kwargs["engagement"].product - logger.debug("Defined product of engagement %s", product) +class NotificationManagerHelpers: - elif "test" in kwargs: - product = kwargs["test"].engagement.product - logger.debug("Defined product of test %s", product) + """Common functions for use in the Mangers.""" - elif "finding" in kwargs: - product = kwargs["finding"].test.engagement.product - logger.debug("Defined product of finding %s", product) + def __init__( + self, + *_args: list, + system_notifications: Notifications | None = None, + system_settings: System_Settings | None = None, + **_kwargs: dict, + ) -> None: + self.system_notifications = system_notifications or self._get_notifications_object() + self.system_settings = system_settings or self._get_system_settings() - elif "obj" in kwargs: - from dojo.utils import get_product - product = get_product(kwargs["obj"]) - logger.debug("Defined product of obj %s", product) - - # System notifications are sent one with user=None, which will trigger email to configured system email, to global slack channel, etc. - process_notifications(event, system_notifications, **kwargs) - - # All admins will also receive system notifications, but as part of the person global notifications section below - # This time user is set, so will trigger email to personal email, to personal slack channel (mention), etc. - # only retrieve users which have at least one notification type enabled for this event type. - logger.debug("creating personal notifications for event: %s", event) - - # There are notification like deleting a product type that shall not be sent to users. - # These notifications will have the parameter no_users=True - if not ("no_users" in kwargs and kwargs["no_users"] is True): - # get users with either global notifications, or a product specific noditiciation - # and all admin/superuser, they will always be notified - users = Dojo_User.objects.filter(is_active=True).prefetch_related(Prefetch( - "notifications_set", - queryset=Notifications.objects.filter(Q(product_id=product) | Q(product__isnull=True)), - to_attr="applicable_notifications", - )).annotate(applicable_notifications_count=Count("notifications__id", filter=Q(notifications__product_id=product) | Q(notifications__product__isnull=True)))\ - .filter(Q(applicable_notifications_count__gt=0) | Q(is_superuser=True)) - - # only send to authorized users or admin/superusers - logger.debug("Filtering users for the product %s", product) - - if product: - users = get_authorized_users_for_product_and_product_type(users, product, Permissions.Product_View) - - elif product_type: - users = get_authorized_users_for_product_type(users, product_type, Permissions.Product_Type_View) + def _get_notifications_object(self) -> Notifications: + """Set the system Notifications object on the class.""" + try: + return Notifications.objects.get(user=None, template=False) + except Exception: + return Notifications() + + def _get_system_settings(self) -> System_Settings: + """Set the system settings object in the class.""" + return System_Settings.objects.get() + + def _create_description(self, event: str, kwargs: dict) -> str: + if kwargs.get("description") is None: + if event == "product_added": + kwargs["description"] = _("Product %s has been created successfully.") % kwargs["title"] + elif event == "product_type_added": + kwargs["description"] = _("Product Type %s has been created successfully.") % kwargs["title"] else: - # nor product_type nor product defined, we should not make noise and send only notifications to admins - logger.debug("Product is not specified, making it silent") - users = users.filter(is_superuser=True) - - for user in users: - logger.debug("Authorized user for the product %s", user) - # send notifications to user after merging possible multiple notifications records (i.e. personal global + personal product) - # kwargs.update({'user': user}) - applicable_notifications = user.applicable_notifications - if user.is_superuser: - logger.debug("User %s is superuser", user) - # admin users get all system notifications - applicable_notifications.append(system_notifications) - - notifications_set = Notifications.merge_notifications_list(applicable_notifications) - notifications_set.user = user - process_notifications(event, notifications_set, **kwargs) - - -def create_description(event, *args, **kwargs): - if "description" not in kwargs: - if event == "product_added": - kwargs["description"] = _("Product %s has been created successfully.") % kwargs["title"] - elif event == "product_type_added": - kwargs["description"] = _("Product Type %s has been created successfully.") % kwargs["title"] - else: - kwargs["description"] = _("Event %s has occurred.") % str(event) - - return kwargs["description"] + kwargs["description"] = _("Event %s has occurred.") % str(event) + return kwargs["description"] -def create_notification_message(event, user, notification_type, *args, **kwargs): - template = f"notifications/{notification_type}/{event.replace('/', '')}.tpl" - kwargs.update({"user": user}) + def _create_notification_message( + self, + event: str, + user: Dojo_User, + notification_type: str, + kwargs: dict, + ) -> str: + template = f"notifications/{notification_type}/{event.replace('/', '')}.tpl" + kwargs.update({"user": user}) + notification_message = None - notification_message = None + # TODO: This may be deleted + # if (title := kwargs.get("title")) is not None: + # kwargs.update({"title": title}) - if (title := kwargs.get("title")) is not None: - kwargs.update({"title": title}) - - if kwargs.get("description") is None: - kwargs.update({"description": create_description(event, *args, **kwargs)}) - - try: - notification_message = render_to_string(template, kwargs) - logger.debug("Rendering from the template %s", template) - except TemplateDoesNotExist as e: - # In some cases, template includes another templates, if the interior one is missing, we will see it in "specifically" section - logger.debug(f"template not found or not implemented yet: {template} (specifically: {e.args})") - except Exception as e: - logger.error("error during rendering of template %s exception is %s", template, e) - finally: - if not notification_message: - kwargs["description"] = create_description(event, *args, **kwargs) - notification_message = render_to_string(f"notifications/{notification_type}/other.tpl", kwargs) - - return notification_message or "" - - -def process_notifications(event, notifications=None, **kwargs): - from dojo.utils import get_system_setting - - if not notifications: - logger.warning("no notifications!") - return - - logger.debug("sending notification " + ("asynchronously" if we_want_async() else "synchronously")) - logger.debug("process notifications for %s", notifications.user) - logger.debug("notifications: %s", vars(notifications)) - - slack_enabled = get_system_setting("enable_slack_notifications") - msteams_enabled = get_system_setting("enable_msteams_notifications") - mail_enabled = get_system_setting("enable_mail_notifications") - webhooks_enabled = get_system_setting("enable_webhooks_notifications") - - if slack_enabled and "slack" in getattr(notifications, event, getattr(notifications, "other")): - logger.debug("Sending Slack Notification") - send_slack_notification(event, notifications.user, **kwargs) - - if msteams_enabled and "msteams" in getattr(notifications, event, getattr(notifications, "other")): - logger.debug("Sending MSTeams Notification") - send_msteams_notification(event, notifications.user, **kwargs) - - if mail_enabled and "mail" in getattr(notifications, event, getattr(notifications, "other")): - logger.debug("Sending Mail Notification") - send_mail_notification(event, notifications.user, **kwargs) - - if webhooks_enabled and "webhooks" in getattr(notifications, event, getattr(notifications, "other")): - logger.debug("Sending Webhooks Notification") - send_webhooks_notification(event, notifications.user, **kwargs) - - if "alert" in getattr(notifications, event, getattr(notifications, "other")): - logger.debug(f"Sending Alert to {notifications.user}") - send_alert_notification(event, notifications.user, **kwargs) + if kwargs.get("description") is None: + kwargs.update({"description": self._create_description(event, kwargs)}) + try: + notification_message = render_to_string(template, kwargs) + logger.debug("Rendering from the template %s", template) + except TemplateDoesNotExist as e: + # In some cases, template includes another templates, if the interior one is missing, we will see it in "specifically" section + logger.debug( + f"template not found or not implemented yet: {template} (specifically: {e.args})", + ) + except Exception as e: + logger.error( + "error during rendering of template %s exception is %s", + template, + e, + ) + finally: + if not notification_message: + kwargs["description"] = self._create_description(event, kwargs) + notification_message = render_to_string( + f"notifications/{notification_type}/other.tpl", + kwargs, + ) -@dojo_async_task -@app.task -def send_slack_notification(event, user=None, *args, **kwargs): - from dojo.utils import get_system_setting + return notification_message or "" + + def _log_alert( + self, + exception: Exception, + notification_type: str | None = None, + **kwargs: dict, + ) -> None: + # no try catch here, if this fails we need to show an error + for user in Dojo_User.objects.filter(is_superuser=True): + alert = Alerts( + user_id=user, + url=kwargs.get("url", reverse("alerts")), + title=kwargs.get("title", "Notification issue")[:250], + description=kwargs.get("description", str(exception))[:2000], + icon="exclamation-triangle", + source=notification_type[:100] if notification_type else kwargs.get("source", "unknown")[:100], + ) + # relative urls will fail validation + alert.clean_fields(exclude=["url"]) + alert.save() + + +class SlackNotificationManger(NotificationManagerHelpers): + + """Manger for slack notifications and their helpers.""" + + @dojo_async_task + @app.task + def send_slack_notification( + self, + event: str, + user: Dojo_User | None = None, + **kwargs: dict, + ): + try: + # If the user has slack information on profile and chooses to receive slack notifications + # Will receive a DM + if user is not None: + logger.debug("personal notification to slack for user %s", user) + if hasattr(user, "usercontactinfo") and user.usercontactinfo.slack_username is not None: + slack_user_id = user.usercontactinfo.slack_user_id + if not slack_user_id: + # Lookup the slack userid the first time, then save it. + slack_user_id = self._get_slack_user_id( + user.usercontactinfo.slack_username, + ) + if slack_user_id: + slack_user_save = UserContactInfo.objects.get( + user_id=user.id, + ) + slack_user_save.slack_user_id = slack_user_id + slack_user_save.save() + # only send notification if we managed to find the slack_user_id + if slack_user_id: + channel = f"@{slack_user_id}" + self._post_slack_message(event, user, channel, **kwargs) + else: + logger.info( + "The user %s does not have a email address informed for Slack in profile.", + user, + ) + else: + # System scope slack notifications, and not personal would still see this go through + if self.system_settings.slack_channel is not None: + channel = self.system_settings.slack_channel + logger.info( + f"Sending system notification to system channel {channel}.", + ) + self._post_slack_message(event, user, channel, **kwargs) + else: + logger.debug( + "slack_channel not configured: skipping system notification", + ) + + except Exception as exception: + logger.exception(exception) + self._log_alert( + exception, + "Slack Notification", + title=kwargs["title"], + description=str(exception), + url=kwargs.get("url"), + ) + + def _get_slack_user_id(self, user_email: str) -> str: + user_id = None + res = requests.request( + method="POST", + url="https://slack.com/api/users.lookupByEmail", + data={"token": self.system_settings.slack_token, "email": user_email}, + timeout=settings.REQUESTS_TIMEOUT, + ) - def _post_slack_message(channel): + user = json.loads(res.text) + slack_user_is_found = False + if user: + if "error" in user: + logger.error("Slack is complaining. See error message below.") + logger.error(user) + raise RuntimeError("Error getting user list from Slack: " + res.text) + if "email" in user["user"]["profile"]: + if user_email == user["user"]["profile"]["email"]: + if "id" in user["user"]: + user_id = user["user"]["id"] + logger.debug(f"Slack user ID is {user_id}") + slack_user_is_found = True + else: + logger.warning( + f"A user with email {user_email} could not be found in this Slack workspace.", + ) + + if not slack_user_is_found: + logger.warning("The Slack user was not found.") + + return user_id + + def _post_slack_message( + self, + event: str, + user: Dojo_User, + channel: str, + **kwargs: dict, + ) -> None: res = requests.request( method="POST", url="https://slack.com/api/chat.postMessage", data={ - "token": get_system_setting("slack_token"), + "token": self.system_settings.slack_token, "channel": channel, - "username": get_system_setting("slack_username"), - "text": create_notification_message(event, user, "slack", *args, **kwargs), + "username": self.system_settings.slack_username, + "text": self._create_notification_message(event, user, "slack", kwargs), }, timeout=settings.REQUESTS_TIMEOUT, ) @@ -236,155 +307,560 @@ def _post_slack_message(channel): logger.error(res.text) raise RuntimeError("Error posting message to Slack: " + res.text) - try: - # If the user has slack information on profile and chooses to receive slack notifications - # Will receive a DM - if user is not None: - logger.debug("personal notification to slack for user %s", user) - if hasattr(user, "usercontactinfo") and user.usercontactinfo.slack_username is not None: - slack_user_id = user.usercontactinfo.slack_user_id - if not slack_user_id: - # Lookup the slack userid the first time, then save it. - slack_user_id = get_slack_user_id( - user.usercontactinfo.slack_username) - if slack_user_id: - slack_user_save = UserContactInfo.objects.get(user_id=user.id) - slack_user_save.slack_user_id = slack_user_id - slack_user_save.save() - - # only send notification if we managed to find the slack_user_id - if slack_user_id: - channel = f"@{slack_user_id}" - _post_slack_message(channel) - else: - logger.info("The user %s does not have a email address informed for Slack in profile.", user) +class MSTeamsNotificationManger(NotificationManagerHelpers): + + """Manger for Microsoft Teams notifications and their helpers.""" + + @dojo_async_task + @app.task + def send_msteams_notification( + self, + event: str, + user: Dojo_User | None = None, + **kwargs: dict, + ): + try: + # Microsoft Teams doesn't offer direct message functionality, so no MS Teams PM functionality here... + if user is None: + if self.system_settings.msteams_url is not None: + logger.debug("sending MSTeams message") + res = requests.request( + method="POST", + url=self.system_settings.msteams_url, + data=self._create_notification_message( + event, + None, + "msteams", + kwargs, + ), + timeout=settings.REQUESTS_TIMEOUT, + ) + if res.status_code != 200: + logger.error("Error when sending message to Microsoft Teams") + logger.error(res.status_code) + logger.error(res.text) + raise RuntimeError( + "Error posting message to Microsoft Teams: " + res.text, + ) + else: + logger.info( + "Webhook URL for Microsoft Teams not configured: skipping system notification", + ) + except Exception as exception: + logger.exception(exception) + self._log_alert( + exception, + "Microsoft Teams Notification", + title=kwargs["title"], + description=str(exception), + url=kwargs["url"], + ) + + +class EmailNotificationManger(NotificationManagerHelpers): + + """Manger for email notifications and their helpers.""" + + @dojo_async_task + @app.task + def send_mail_notification( + self, + event: str, + user: Dojo_User | None = None, + **kwargs: dict, + ): + # Attempt to get the "to" address + if (recipient := kwargs.get("recipient")) is not None: + address = recipient + elif user: + address = user.email else: - # System scope slack notifications, and not personal would still see this go through - if get_system_setting("slack_channel") is not None: - channel = get_system_setting("slack_channel") - logger.info(f"Sending system notification to system channel {channel}.") - _post_slack_message(channel) + address = self.system_settings.mail_notifications_to + + logger.debug("notification email for user %s to %s", user, address) + + try: + subject = f"{self.system_settings.team_name} notification" + if (title := kwargs.get("title")) is not None: + subject += f": {title}" + + email = EmailMessage( + subject, + self._create_notification_message(event, user, "mail", kwargs), + self.system_settings.email_from, + [address], + headers={"From": f"{self.system_settings.email_from}"}, + ) + email.content_subtype = "html" + logger.debug("sending email alert") + email.send(fail_silently=False) + + except Exception as exception: + logger.exception(exception) + self._log_alert( + exception, + "Email Notification", + title=kwargs["title"], + description=str(exception), + url=kwargs["url"], + ) + + +class WebhookNotificationManger(NotificationManagerHelpers): + + """Manger for webhook notifications and their helpers.""" + + ERROR_PERMANENT = "permanent" + ERROR_TEMPORARY = "temporary" + + @dojo_async_task + @app.task + def send_webhooks_notification( + self, + event: str, + user: Dojo_User | None = None, + **kwargs: dict, + ): + for endpoint in self._get_webhook_endpoints(user=user): + error = None + if endpoint.status not in [ + Notification_Webhooks.Status.STATUS_ACTIVE, + Notification_Webhooks.Status.STATUS_ACTIVE_TMP, + ]: + logger.info( + f"URL for Webhook '{endpoint.name}' is not active: {endpoint.get_status_display()} ({endpoint.status})", + ) + continue + + try: + logger.debug(f"Sending webhook message to endpoint '{endpoint.name}'") + res = self._webhooks_notification_request(endpoint, event, **kwargs) + if 200 <= res.status_code < 300: + logger.debug( + f"Message sent to endpoint '{endpoint.name}' successfully.", + ) + continue + # HTTP request passed successfully but we still need to check status code + if 500 <= res.status_code < 600 or res.status_code == 429: + error = self.ERROR_TEMPORARY + else: + error = self.ERROR_PERMANENT + + endpoint.note = f"Response status code: {res.status_code}" + logger.error( + f"Error when sending message to Webhooks '{endpoint.name}' (status: {res.status_code}): {res.text}", + ) + except requests.exceptions.Timeout as e: + error = self.ERROR_TEMPORARY + endpoint.note = f"Requests exception: {e}" + logger.error( + f"Timeout when sending message to Webhook '{endpoint.name}'", + ) + except Exception as exception: + error = self.ERROR_PERMANENT + endpoint.note = f"Exception: {exception}"[:1000] + logger.exception(exception) + self._log_alert(exception, "Webhooks Notification") + + now = get_current_datetime() + if error == self.ERROR_TEMPORARY: + # If endpoint is unstable for more then one day, it needs to be deactivated + if endpoint.first_error is not None and (now - endpoint.first_error).total_seconds() > 60 * 60 * 24: + endpoint.status = Notification_Webhooks.Status.STATUS_INACTIVE_PERMANENT + else: + # We need to monitor when outage started + if endpoint.status == Notification_Webhooks.Status.STATUS_ACTIVE: + endpoint.first_error = now + endpoint.status = Notification_Webhooks.Status.STATUS_INACTIVE_TMP + # In case of failure within one day, endpoint can be deactivated temporally only for one minute + self._webhook_reactivation.apply_async( + args=[self], + kwargs={"endpoint_id": endpoint.pk}, + countdown=60, + ) + # There is no reason to keep endpoint active if it is returning 4xx errors else: - logger.debug("slack_channel not configured: skipping system notification") - - except Exception as e: - logger.exception(e) - log_alert(e, "Slack Notification", title=kwargs["title"], description=str(e), url=kwargs.get("url")) - - -@dojo_async_task -@app.task -def send_msteams_notification(event, user=None, *args, **kwargs): - from dojo.utils import get_system_setting - - try: - # Microsoft Teams doesn't offer direct message functionality, so no MS Teams PM functionality here... - if user is None: - if get_system_setting("msteams_url") is not None: - logger.debug("sending MSTeams message") - res = requests.request( - method="POST", - url=get_system_setting("msteams_url"), - data=create_notification_message(event, None, "msteams", *args, **kwargs), - timeout=settings.REQUESTS_TIMEOUT, + endpoint.status = Notification_Webhooks.Status.STATUS_INACTIVE_PERMANENT + endpoint.first_error = now + + endpoint.last_error = now + endpoint.save() + + def _get_webhook_endpoints( + self, + user: Dojo_User | None = None, + ) -> QuerySet[Notification_Webhooks]: + endpoints = Notification_Webhooks.objects.filter(owner=user) + if not endpoints.exists(): + if user: + logger.info( + f"URLs for Webhooks not configured for user '{user}': skipping user notification", ) - if res.status_code != 200: - logger.error("Error when sending message to Microsoft Teams") - logger.error(res.status_code) - logger.error(res.text) - raise RuntimeError("Error posting message to Microsoft Teams: " + res.text) else: - logger.info("Webhook URL for Microsoft Teams not configured: skipping system notification") - except Exception as e: - logger.exception(e) - log_alert(e, "Microsoft Teams Notification", title=kwargs["title"], description=str(e), url=kwargs["url"]) - - -@dojo_async_task -@app.task -def send_mail_notification(event, user=None, *args, **kwargs): - from dojo.utils import get_system_setting - email_from_address = get_system_setting("email_from") - # Attempt to get the "to" address - if "recipient" in kwargs: - address = kwargs.get("recipient") - elif user: - address = user.email - else: - address = get_system_setting("mail_notifications_to") - - logger.debug("notification email for user %s to %s", user, address) - - try: - subject = f"{get_system_setting('team_name')} notification" - if "title" in kwargs: - subject += f": {kwargs['title']}" - - email = EmailMessage( - subject, - create_notification_message(event, user, "mail", *args, **kwargs), - email_from_address, - [address], - headers={"From": f"{email_from_address}"}, + logger.info( + "URLs for Webhooks not configured: skipping system notification", + ) + return Notification_Webhooks.objects.none() + return endpoints + + def _generate_request_details( + self, + endpoint: Notification_Webhooks, + event: str | None = None, + **kwargs: dict, + ) -> tuple[dict, dict]: + headers = { + "User-Agent": f"DefectDojo-{dd_version}", + "X-DefectDojo-Event": event, + "X-DefectDojo-Instance": settings.SITE_URL, + "Accept": "application/json", + } + if endpoint.header_name is not None: + headers[endpoint.header_name] = endpoint.header_value + yaml_data = self._create_notification_message( + event, + endpoint.owner, + "webhooks", + kwargs, + ) + data = yaml.safe_load(yaml_data) + + return headers, data + + def _webhooks_notification_request( + self, + endpoint: Notification_Webhooks, + event: str | None = None, + **kwargs: dict, + ) -> requests.Response: + headers, data = self._generate_request_details(endpoint, event=event, **kwargs) + return requests.request( + method="POST", + url=endpoint.url, + headers=headers, + json=data, + timeout=self.system_settings.webhooks_notifications_timeout, ) - email.content_subtype = "html" - logger.debug("sending email alert") - # logger.info(create_notification_message(event, user, 'mail', *args, **kwargs)) - email.send(fail_silently=False) - - except Exception as e: - logger.exception(e) - log_alert(e, "Email Notification", title=kwargs["title"], description=str(e), url=kwargs["url"]) - - -def webhooks_notification_request(endpoint, event, *args, **kwargs): - from dojo.utils import get_system_setting - - headers = { - "User-Agent": f"DefectDojo-{dd_version}", - "X-DefectDojo-Event": event, - "X-DefectDojo-Instance": settings.SITE_URL, - "Accept": "application/json", - } - if endpoint.header_name is not None: - headers[endpoint.header_name] = endpoint.header_value - yaml_data = create_notification_message(event, endpoint.owner, "webhooks", *args, **kwargs) - data = yaml.safe_load(yaml_data) - - timeout = get_system_setting("webhooks_notifications_timeout") - - return requests.request( - method="POST", - url=endpoint.url, - headers=headers, - json=data, - timeout=timeout, - ) + def _test_webhooks_notification(self, endpoint: Notification_Webhooks) -> None: + res = self._webhooks_notification_request( + endpoint, + "ping", + description="Test webhook notification", + ) + res.raise_for_status() + # in "send_webhooks_notification", we are doing deeper analysis, why it failed + # for now, "raise_for_status" should be enough + + @app.task(ignore_result=True) + def _webhook_reactivation(self, endpoint_id: int, **_kwargs: dict): + endpoint = Notification_Webhooks.objects.get(pk=endpoint_id) + # User already changed status of endpoint + if endpoint.status != Notification_Webhooks.Status.STATUS_INACTIVE_TMP: + return + endpoint.status = Notification_Webhooks.Status.STATUS_ACTIVE_TMP + endpoint.save() + logger.debug( + f"Webhook endpoint '{endpoint.name}' reactivated to '{Notification_Webhooks.Status.STATUS_ACTIVE_TMP}'", + ) -def test_webhooks_notification(endpoint): - res = webhooks_notification_request(endpoint, "ping", description="Test webhook notification") - res.raise_for_status() - # in "send_webhooks_notification", we are doing deeper analysis, why it failed - # for now, "raise_for_status" should be enough +class AlertNotificationManger(NotificationManagerHelpers): -@app.task(ignore_result=True) -def webhook_reactivation(endpoint_id: int, *args, **kwargs): - endpoint = Notification_Webhooks.objects.get(pk=endpoint_id) + """Manger for alert notifications and their helpers.""" - # User already changed status of endpoint - if endpoint.status != Notification_Webhooks.Status.STATUS_INACTIVE_TMP: - return + def send_alert_notification( + self, + event: str, + user: Dojo_User | None = None, + **kwargs: dict, + ): + logger.debug("sending alert notification to %s", user) + try: + # no need to differentiate between user/no user + icon = kwargs.get("icon", "info-circle") + try: + source = Notifications._meta.get_field(event).verbose_name.title()[:100] + except FieldDoesNotExist: + source = event.replace("_", " ").title()[:100] + alert = Alerts( + user_id=user, + title=kwargs.get("title")[:250], + description=self._create_notification_message( + event, + user, + "alert", + kwargs, + )[:2000], + url=kwargs.get("url", reverse("alerts")), + icon=icon[:25], + source=source, + ) + # relative urls will fail validation + alert.clean_fields(exclude=["url"]) + alert.save() + except Exception as exception: + logger.exception(exception) + self._log_alert( + exception, + "Alert Notification", + title=kwargs["title"], + description=str(exception), + url=kwargs["url"], + ) + + +class NotificationManager(NotificationManagerHelpers): + + """Manage the construction and dispatch of notifications.""" + + def __init__(self, *args: list, **kwargs: dict) -> None: + NotificationManagerHelpers.__init__(self, *args, **kwargs) + + def create_notification(self, event: str | None = None, **kwargs: dict) -> None: + # Process the notifications for a given list of recipients + if kwargs.get("recipients") is not None: + self._process_recipients(event=event, **kwargs) + else: + logger.debug("creating system notifications for event: %s", event) + # send system notifications to all admin users + self._process_objects(**kwargs) + # System notifications are sent one with user=None, which will trigger email to configured system email, to global slack channel, etc. + self._process_notifications( + event, + notifications=self.system_notifications, + **kwargs, + ) + # All admins will also receive system notifications, but as part of the person global notifications section below + # This time user is set, so will trigger email to personal email, to personal slack channel (mention), etc. + # only retrieve users which have at least one notification type enabled for this event type. + logger.debug("creating personal notifications for event: %s", event) + # There are notification like deleting a product type that shall not be sent to users. + # These notifications will have the parameter no_users=True + if kwargs.get("no_users", False) is False: + # get users with either global notifications, or a product specific notification + # and all admin/superuser, they will always be notified + for user in self._get_user_to_send_notifications_to(): + self._send_single_notification_to_user(user, event=event, **kwargs) + + def _process_recipients(self, event: str | None = None, **kwargs: dict) -> None: + # mimic existing code so that when recipients is specified, no other system or personal notifications are sent. + logger.debug("creating notifications for recipients: %s", kwargs["recipients"]) + for recipient_notifications in Notifications.objects.filter( + user__username__in=kwargs["recipients"], + user__is_active=True, + product=None, + ): + if event in settings.NOTIFICATIONS_SYSTEM_LEVEL_TRUMP: + # merge the system level notifications with the personal level + # this allows for system to trump the personal + merged_notifications = Notifications.merge_notifications_list( + [self.system_notifications, recipient_notifications], + ) + merged_notifications.user = recipient_notifications.user + logger.debug("Sent notification to %s", merged_notifications.user) + self._process_notifications( + event, + notifications=merged_notifications, + **kwargs, + ) + else: + # Do not trump user preferences and send notifications as usual + logger.debug("Sent notification to %s", recipient_notifications.user) + self._process_notifications( + event, + notifications=recipient_notifications, + **kwargs, + ) - endpoint.status = Notification_Webhooks.Status.STATUS_ACTIVE_TMP - endpoint.save() - logger.debug(f"Webhook endpoint '{endpoint.name}' reactivated to '{Notification_Webhooks.Status.STATUS_ACTIVE_TMP}'") + def _process_objects(self, **kwargs: dict) -> None: + """Extract the product and product type from the kwargs.""" + self.product_type: Product_Type = None + self.product: Product = None + if (product_type := kwargs.get("product_type")) is not None: + self.product_type = product_type + logger.debug("Defined product type %s", self.product_type) + if (product := kwargs.get("product")) is not None: + self.product = product + logger.debug("Defined product %s", self.product) + elif (engagement := kwargs.get("engagement")) is not None: + self.product = engagement.product + logger.debug("Defined product of engagement %s", self.product) + elif (test := kwargs.get("test")) is not None: + self.product = test.engagement.product + logger.debug("Defined product of test %s", self.product) + elif (finding := kwargs.get("finding")) is not None: + self.product = finding.test.engagement.product + logger.debug("Defined product of finding %s", self.product) + elif (obj := kwargs.get("obj")) is not None: + from dojo.utils import get_product + + self.product = get_product(obj) + logger.debug("Defined product of obj %s", self.product) + + def _get_user_to_send_notifications_to( + self, + ) -> QuerySet[Dojo_User]: + """Determine the users we should send notifications to based on product and product type permissions.""" + users = ( + Dojo_User.objects.filter(is_active=True) + .prefetch_related( + Prefetch( + "notifications_set", + queryset=Notifications.objects.filter( + Q(product_id=self.product) | Q(product__isnull=True), + ), + to_attr="applicable_notifications", + ), + ) + .annotate( + applicable_notifications_count=Count( + "notifications__id", + filter=Q(notifications__product_id=self.product) | Q(notifications__product__isnull=True), + ), + ) + .filter(Q(applicable_notifications_count__gt=0) | Q(is_superuser=True)) + ) + # only send to authorized users or admin/superusers + logger.debug("Filtering users for the product %s", self.product) + if self.product is not None: + users = get_authorized_users_for_product_and_product_type( + users, + self.product, + Permissions.Product_View, + ) + elif self.product_type is not None: + users = get_authorized_users_for_product_type( + users, + self.product_type, + Permissions.Product_Type_View, + ) + else: + # nor product_type nor product defined, we should not make noise and send only notifications to admins + logger.debug("Product is not specified, making it silent") + users = users.filter(is_superuser=True) + return users + + def _send_single_notification_to_user( + self, + user: Dojo_User, + event: str | None = None, + **kwargs: dict, + ) -> None: + """Send a notification to a single user.""" + logger.debug("Authorized user for the product %s", user) + # send notifications to user after merging possible multiple notifications records (i.e. personal global + personal product) + # kwargs.update({'user': user}) + applicable_notifications = user.applicable_notifications + if user.is_superuser: + # admin users get all system notifications + logger.debug("User %s is superuser", user) + applicable_notifications.append(self.system_notifications) + + notifications_set = Notifications.merge_notifications_list( + applicable_notifications, + ) + notifications_set.user = user + self._process_notifications(event, notifications=notifications_set, **kwargs) + + def _get_manager_instance( + self, + alert_type: str, + ) -> type[NotificationManagerHelpers]: + kwargs = { + "system_notifications": self.system_notifications, + "system_settings": self.system_settings, + } + if alert_type == "slack": + return SlackNotificationManger(**kwargs) + if alert_type == "msteams": + return MSTeamsNotificationManger(**kwargs) + if alert_type == "mail": + return EmailNotificationManger(**kwargs) + if alert_type == "webhooks": + return WebhookNotificationManger(**kwargs) + if alert_type == "alert": + return AlertNotificationManger(**kwargs) + + msg = f"Unsupported alert type: {alert_type}" + raise TypeError(msg) + + def _process_notifications( + self, + event: str | None, + notifications: Notifications | None = None, + **kwargs: dict, + ) -> None: + # Quick break out if we do not have any work to do + if not notifications: + logger.warning("no notifications!") + return + + logger.debug( + "sending notification " + ("asynchronously" if we_want_async() else "synchronously"), + ) + logger.debug("process notifications for %s", notifications.user) + + if self.system_settings.enable_slack_notifications and "slack" in getattr( + notifications, + event, + getattr(notifications, "other"), + ): + logger.debug("Sending Slack Notification") + self._get_manager_instance("slack").send_slack_notification( + event, + user=notifications.user, + **kwargs, + ) + + if self.system_settings.enable_msteams_notifications and "msteams" in getattr( + notifications, + event, + getattr(notifications, "other"), + ): + logger.debug("Sending MSTeams Notification") + self._get_manager_instance("msteams").send_msteams_notification( + event, + user=notifications.user, + **kwargs, + ) + + if self.system_settings.enable_mail_notifications and "mail" in getattr( + notifications, + event, + getattr(notifications, "other"), + ): + logger.debug("Sending Mail Notification") + self._get_manager_instance("mail").send_mail_notification( + event, + user=notifications.user, + **kwargs, + ) + + if self.system_settings.enable_webhooks_notifications and "webhooks" in getattr( + notifications, + event, + getattr(notifications, "other"), + ): + logger.debug("Sending Webhooks Notification") + self._get_manager_instance("webhooks").send_webhooks_notification( + event, + user=notifications.user, + **kwargs, + ) + + if "alert" in getattr(notifications, event, getattr(notifications, "other")): + logger.debug(f"Sending Alert to {notifications.user}") + self._get_manager_instance("alert").send_alert_notification( + event, + user=notifications.user, + **kwargs, + ) @app.task(ignore_result=True) -def webhook_status_cleanup(*args, **kwargs): +def webhook_status_cleanup(*_args: list, **_kwargs: dict): # If some endpoint was affected by some outage (5xx, 429, Timeout) but it was clean during last 24 hours, # we consider this endpoint as healthy so need to reset it endpoints = Notification_Webhooks.objects.filter( @@ -397,7 +873,9 @@ def webhook_status_cleanup(*args, **kwargs): endpoint.last_error = None endpoint.note = f"Reactivation from {Notification_Webhooks.Status.STATUS_ACTIVE_TMP}" endpoint.save() - logger.debug(f"Webhook endpoint '{endpoint.name}' reactivated from '{Notification_Webhooks.Status.STATUS_ACTIVE_TMP}' to '{Notification_Webhooks.Status.STATUS_ACTIVE}'") + logger.debug( + f"Webhook endpoint '{endpoint.name}' reactivated from '{Notification_Webhooks.Status.STATUS_ACTIVE_TMP}' to '{Notification_Webhooks.Status.STATUS_ACTIVE}'", + ) # Reactivation of STATUS_INACTIVE_TMP endpoints. # They should reactive automatically in 60s, however in case of some unexpected event (e.g. start of whole stack), @@ -407,180 +885,5 @@ def webhook_status_cleanup(*args, **kwargs): last_error__lt=get_current_datetime() - timedelta(minutes=5), ) for endpoint in broken_endpoints: - webhook_reactivation(endpoint_id=endpoint.pk) - - -@dojo_async_task -@app.task -def send_webhooks_notification(event, user=None, *args, **kwargs): - - ERROR_PERMANENT = "permanent" - ERROR_TEMPORARY = "temporary" - - endpoints = Notification_Webhooks.objects.filter(owner=user) - - if not endpoints: - if user: - logger.info(f"URLs for Webhooks not configured for user '{user}': skipping user notification") - else: - logger.info("URLs for Webhooks not configured: skipping system notification") - return - - for endpoint in endpoints: - - error = None - if endpoint.status not in [Notification_Webhooks.Status.STATUS_ACTIVE, Notification_Webhooks.Status.STATUS_ACTIVE_TMP]: - logger.info(f"URL for Webhook '{endpoint.name}' is not active: {endpoint.get_status_display()} ({endpoint.status})") - continue - - try: - logger.debug(f"Sending webhook message to endpoint '{endpoint.name}'") - res = webhooks_notification_request(endpoint, event, *args, **kwargs) - - if 200 <= res.status_code < 300: - logger.debug(f"Message sent to endpoint '{endpoint.name}' successfully.") - continue - - # HTTP request passed successfully but we still need to check status code - error = ERROR_TEMPORARY if 500 <= res.status_code < 600 or res.status_code == 429 else ERROR_PERMANENT - - endpoint.note = f"Response status code: {res.status_code}" - logger.error(f"Error when sending message to Webhooks '{endpoint.name}' (status: {res.status_code}): {res.text}") - - except requests.exceptions.Timeout as e: - error = ERROR_TEMPORARY - endpoint.note = f"Requests exception: {e}" - logger.error(f"Timeout when sending message to Webhook '{endpoint.name}'") - - except Exception as e: - error = ERROR_PERMANENT - endpoint.note = f"Exception: {e}"[:1000] - logger.exception(e) - log_alert(e, "Webhooks Notification") - - now = get_current_datetime() - - if error == ERROR_TEMPORARY: - - # If endpoint is unstable for more then one day, it needs to be deactivated - if endpoint.first_error is not None and (now - endpoint.first_error).total_seconds() > 60 * 60 * 24: - endpoint.status = Notification_Webhooks.Status.STATUS_INACTIVE_PERMANENT - - else: - # We need to monitor when outage started - if endpoint.status == Notification_Webhooks.Status.STATUS_ACTIVE: - endpoint.first_error = now - - endpoint.status = Notification_Webhooks.Status.STATUS_INACTIVE_TMP - - # In case of failure within one day, endpoint can be deactivated temporally only for one minute - webhook_reactivation.apply_async(kwargs={"endpoint_id": endpoint.pk}, countdown=60) - - # There is no reason to keep endpoint active if it is returning 4xx errors - else: - endpoint.status = Notification_Webhooks.Status.STATUS_INACTIVE_PERMANENT - endpoint.first_error = now - - endpoint.last_error = now - endpoint.save() - - -def send_alert_notification(event, user=None, *args, **kwargs): - logger.debug("sending alert notification to %s", user) - try: - # no need to differentiate between user/no user - icon = kwargs.get("icon", "info-circle") - try: - source = Notifications._meta.get_field(event).verbose_name.title()[:100] - except FieldDoesNotExist: - source = event.replace("_", " ").title()[:100] - alert = Alerts( - user_id=user, - title=kwargs.get("title")[:250], - description=create_notification_message(event, user, "alert", *args, **kwargs)[:2000], - url=kwargs.get("url", reverse("alerts")), - icon=icon[:25], - source=source, - ) - # relative urls will fail validation - alert.clean_fields(exclude=["url"]) - alert.save() - except Exception as e: - logger.exception(e) - log_alert(e, "Alert Notification", title=kwargs["title"], description=str(e), url=kwargs["url"]) - - -def get_slack_user_id(user_email): - - from dojo.utils import get_system_setting - - user_id = None - - res = requests.request( - method="POST", - url="https://slack.com/api/users.lookupByEmail", - data={"token": get_system_setting("slack_token"), "email": user_email}, - timeout=settings.REQUESTS_TIMEOUT, - ) - - user = json.loads(res.text) - - slack_user_is_found = False - if user: - if "error" in user: - logger.error("Slack is complaining. See error message below.") - logger.error(user) - raise RuntimeError("Error getting user list from Slack: " + res.text) - if "email" in user["user"]["profile"]: - if user_email == user["user"]["profile"]["email"]: - if "id" in user["user"]: - user_id = user["user"]["id"] - logger.debug(f"Slack user ID is {user_id}") - slack_user_is_found = True - else: - logger.warning(f"A user with email {user_email} could not be found in this Slack workspace.") - - if not slack_user_is_found: - logger.warning("The Slack user was not found.") - - return user_id - - -def log_alert(e, notification_type=None, *args, **kwargs): - # no try catch here, if this fails we need to show an error - - users = Dojo_User.objects.filter(is_superuser=True) - for user in users: - alert = Alerts( - user_id=user, - url=kwargs.get("url", reverse("alerts")), - title=kwargs.get("title", "Notification issue")[:250], - description=kwargs.get("description", str(e))[:2000], - icon="exclamation-triangle", - source=notification_type[:100] if notification_type else kwargs.get("source", "unknown")[:100]) - # relative urls will fail validation - alert.clean_fields(exclude=["url"]) - alert.save() - - -def notify_test_created(test): - title = "Test created for " + str(test.engagement.product) + ": " + str(test.engagement.name) + ": " + str(test) - create_notification(event="test_added", title=title, test=test, engagement=test.engagement, product=test.engagement.product, - url=reverse("view_test", args=(test.id,)), url_api=reverse("test-detail", args=(test.id,))) - - -def notify_scan_added(test, updated_count, new_findings=[], findings_mitigated=[], findings_reactivated=[], findings_untouched=[]): - logger.debug("Scan added notifications") - - new_findings = sorted(new_findings, key=lambda x: x.numerical_severity) - findings_mitigated = sorted(findings_mitigated, key=lambda x: x.numerical_severity) - findings_reactivated = sorted(findings_reactivated, key=lambda x: x.numerical_severity) - findings_untouched = sorted(findings_untouched, key=lambda x: x.numerical_severity) - - title = "Created/Updated " + str(updated_count) + " findings for " + str(test.engagement.product) + ": " + str(test.engagement.name) + ": " + str(test) - - event = "scan_added_empty" if updated_count == 0 else "scan_added" - - create_notification(event=event, title=title, findings_new=new_findings, findings_mitigated=findings_mitigated, findings_reactivated=findings_reactivated, - finding_count=updated_count, test=test, engagement=test.engagement, product=test.engagement.product, findings_untouched=findings_untouched, - url=reverse("view_test", args=(test.id,)), url_api=reverse("test-detail", args=(test.id,))) + manager = WebhookNotificationManger() + manager._webhook_reactivation(manager, endpoint_id=endpoint.pk) diff --git a/dojo/notifications/views.py b/dojo/notifications/views.py index 7fe5562ee7e..7fc58803fc4 100644 --- a/dojo/notifications/views.py +++ b/dojo/notifications/views.py @@ -11,7 +11,7 @@ from dojo.forms import DeleteNotificationsWebhookForm, NotificationsForm, NotificationsWebhookForm from dojo.models import Notification_Webhooks, Notifications -from dojo.notifications.helper import test_webhooks_notification +from dojo.notifications.helper import NotificationManagerHelpers from dojo.utils import add_breadcrumb, get_enabled_notifications_list, get_system_setting logger = logging.getLogger(__name__) @@ -136,6 +136,9 @@ def set_breadcrumbs(self, request: HttpRequest): class NotificationWebhooksView(View): + def get_webhook_manager_instance(self) -> type[NotificationManagerHelpers]: + return Notification_Webhooks() + def check_webhooks_enabled(self): if not get_system_setting("enable_webhooks_notifications"): raise Http404 @@ -216,7 +219,7 @@ def process_form(self, request: HttpRequest, context: dict): form = context["form"] if form.is_valid(): try: - test_webhooks_notification(form.instance) + self.get_webhook_manager_instance().test_webhooks_notification(form.instance) except requests.exceptions.RequestException as e: messages.add_message( request, @@ -305,7 +308,7 @@ def process_form(self, request: HttpRequest, nwh: Notification_Webhooks, context if form.is_valid(): try: - test_webhooks_notification(form.instance) + self.get_webhook_manager_instance().test_webhooks_notification(form.instance) except requests.exceptions.RequestException as e: messages.add_message( request, diff --git a/unittests/test_notifications.py b/unittests/test_notifications.py index cccdb2e3d6b..860c2168599 100644 --- a/unittests/test_notifications.py +++ b/unittests/test_notifications.py @@ -1,6 +1,6 @@ import datetime import logging -from unittest.mock import patch +from unittest.mock import Mock, patch from auditlog.context import set_actor from crum import impersonate @@ -10,11 +10,12 @@ from rest_framework.authtoken.models import Token from rest_framework.test import APIClient, APITestCase -import dojo.notifications.helper as notifications_helper from dojo import __version__ as dd_version +from dojo.importers.base_importer import BaseImporter from dojo.models import ( DEFAULT_NOTIFICATION, Alerts, + Development_Environment, Dojo_User, Endpoint, Engagement, @@ -31,10 +32,9 @@ get_current_datetime, ) from dojo.notifications.helper import ( + AlertNotificationManger, + WebhookNotificationManger, create_notification, - send_alert_notification, - send_webhooks_notification, - webhook_reactivation, webhook_status_cleanup, ) @@ -90,107 +90,115 @@ def test_merge_notifications_list(self): self.assertEqual(len(merged_notifications.other), 3) self.assertEqual(merged_notifications.other, {"alert", "mail", "slack"}) - @patch("dojo.notifications.helper.send_alert_notification", wraps=send_alert_notification) - def test_notifications_system_level_trump(self, mock): + # @patch("dojo.notifications.helper.AlertNotificationManger.send_alert_notification", wraps=AlertNotificationManger.send_alert_notification) + @patch("dojo.notifications.helper.NotificationManager._get_manager_instance") + def test_notifications_system_level_trump(self, mock_get_manager_instance): + mock_manager = Mock(wraps=AlertNotificationManger()) + mock_get_manager_instance.return_value = mock_manager + notif_user, _ = Notifications.objects.get_or_create(user=User.objects.get(username="admin")) notif_system, _ = Notifications.objects.get_or_create(user=None, template=False) - last_count = mock.call_count + last_count = mock_manager.send_alert_notification.call_count with self.subTest("user off, system off"): notif_user.user_mentioned = () # no alert notif_user.save() notif_system.user_mentioned = () # no alert notif_system.save() create_notification(event="user_mentioned", title="user_mentioned", recipients=["admin"]) - self.assertEqual(mock.call_count, last_count) + self.assertEqual(mock_manager.send_alert_notification.call_count, last_count) - last_count = mock.call_count + last_count = mock_manager.send_alert_notification.call_count with self.subTest("user off, system on"): notif_user.user_mentioned = () # no alert notif_user.save() notif_system.user_mentioned = DEFAULT_NOTIFICATION # alert only notif_system.save() create_notification(event="user_mentioned", title="user_mentioned", recipients=["admin"]) - self.assertEqual(mock.call_count, last_count + 1) + self.assertEqual(mock_manager.send_alert_notification.call_count, last_count + 1) # Small note for this test-cast: Trump works only in positive direction - system is not able to disable some kind of notification if user enabled it - last_count = mock.call_count + last_count = mock_manager.send_alert_notification.call_count with self.subTest("user on, system off"): notif_user.user_mentioned = DEFAULT_NOTIFICATION # alert only notif_user.save() notif_system.user_mentioned = () # no alert notif_system.save() create_notification(event="user_mentioned", title="user_mentioned", recipients=["admin"]) - self.assertEqual(mock.call_count, last_count + 1) + self.assertEqual(mock_manager.send_alert_notification.call_count, last_count + 1) - last_count = mock.call_count + last_count = mock_manager.send_alert_notification.call_count with self.subTest("user on, system on"): notif_user.user_mentioned = DEFAULT_NOTIFICATION # alert only notif_user.save() notif_system.user_mentioned = DEFAULT_NOTIFICATION # alert only notif_system.save() create_notification(event="user_mentioned", title="user_mentioned", recipients=["admin"]) - self.assertEqual(mock.call_count, last_count + 1) - last_count = mock.call_count + self.assertEqual(mock_manager.send_alert_notification.call_count, last_count + 1) + last_count = mock_manager.send_alert_notification.call_count + + # @patch("dojo.notifications.helper.AlertNotificationManger.send_alert_notification", wraps=AlertNotificationManger.send_alert_notification) + @patch("dojo.notifications.helper.NotificationManager._get_manager_instance") + def test_non_default_other_notifications(self, mock_get_manager_instance): + mock_manager = Mock(wraps=AlertNotificationManger()) + mock_get_manager_instance.return_value = mock_manager - @patch("dojo.notifications.helper.send_alert_notification", wraps=send_alert_notification) - def test_non_default_other_notifications(self, mock): notif_user, _ = Notifications.objects.get_or_create(user=User.objects.get(username="admin")) notif_system, _ = Notifications.objects.get_or_create(user=None, template=False) - last_count = mock.call_count + last_count = mock_manager.send_alert_notification.call_count with self.subTest("do not notify other"): notif_user.other = () # no alert notif_user.save() create_notification(event="dummy_bar_event", recipients=["admin"]) - self.assertEqual(mock.call_count, last_count) + self.assertEqual(mock_manager.send_alert_notification.call_count, last_count) - last_count = mock.call_count + last_count = mock_manager.send_alert_notification.call_count with self.subTest("notify other"): notif_user.other = DEFAULT_NOTIFICATION # alert only notif_user.save() create_notification(event="dummy_foo_event", title="title_for_dummy_foo_event", description="description_for_dummy_foo_event", recipients=["admin"]) - self.assertEqual(mock.call_count, last_count + 1) - self.assertEqual(mock.call_args_list[0].args[0], "dummy_foo_event") + self.assertEqual(mock_manager.send_alert_notification.call_count, last_count + 1) + self.assertEqual(mock_manager.send_alert_notification.call_args_list[0].args[0], "dummy_foo_event") alert = Alerts.objects.get(title="title_for_dummy_foo_event") self.assertEqual(alert.source, "Dummy Foo Event") - last_count = mock.call_count + last_count = mock_manager.send_alert_notification.call_count with self.subTest("user off, system off"): notif_user.user_mentioned = () # no alert notif_user.save() notif_system.user_mentioned = () # no alert notif_system.save() create_notification(event="user_mentioned", title="user_mentioned", recipients=["admin"]) - self.assertEqual(mock.call_count, last_count + 0) + self.assertEqual(mock_manager.send_alert_notification.call_count, last_count + 0) - last_count = mock.call_count + last_count = mock_manager.send_alert_notification.call_count with self.subTest("user off, system on"): notif_user.user_mentioned = () # no alert notif_user.save() notif_system.user_mentioned = DEFAULT_NOTIFICATION # alert only notif_system.save() create_notification(event="user_mentioned", title="user_mentioned", recipients=["admin"]) - self.assertEqual(mock.call_count, last_count + 1) + self.assertEqual(mock_manager.send_alert_notification.call_count, last_count + 1) # Small note for this test-cast: Trump works only in positive direction - system is not able to disable some kind of notification if user enabled it - last_count = mock.call_count + last_count = mock_manager.send_alert_notification.call_count with self.subTest("user on, system off"): notif_user.user_mentioned = DEFAULT_NOTIFICATION # alert only notif_user.save() notif_system.user_mentioned = () # no alert notif_system.save() create_notification(event="user_mentioned", title="user_mentioned", recipients=["admin"]) - self.assertEqual(mock.call_count, last_count + 1) + self.assertEqual(mock_manager.send_alert_notification.call_count, last_count + 1) - last_count = mock.call_count + last_count = mock_manager.send_alert_notification.call_count with self.subTest("user on, system on"): notif_user.user_mentioned = DEFAULT_NOTIFICATION # alert only notif_user.save() notif_system.user_mentioned = DEFAULT_NOTIFICATION # alert only notif_system.save() create_notification(event="user_mentioned", title="user_mentioned", recipients=["admin"]) - self.assertEqual(mock.call_count, last_count + 1) + self.assertEqual(mock_manager.send_alert_notification.call_count, last_count + 1) class TestNotificationTriggers(DojoTestCase): @@ -199,7 +207,7 @@ class TestNotificationTriggers(DojoTestCase): def setUp(self): self.notification_tester = Dojo_User.objects.get(username="admin") - @patch("dojo.notifications.helper.process_notifications") + @patch("dojo.notifications.helper.NotificationManager._process_notifications") def test_product_types(self, mock): last_count = mock.call_count @@ -219,7 +227,7 @@ def test_product_types(self, mock): self.assertEqual(mock.call_args_list[-1].kwargs["description"], 'The product type "notif prod type" was deleted by admin') self.assertEqual(mock.call_args_list[-1].kwargs["url"], "/product/type") - @patch("dojo.notifications.helper.process_notifications") + @patch("dojo.notifications.helper.NotificationManager._process_notifications") def test_products(self, mock): last_count = mock.call_count @@ -240,7 +248,7 @@ def test_products(self, mock): self.assertEqual(mock.call_args_list[-1].kwargs["description"], 'The product "prod name" was deleted by admin') self.assertEqual(mock.call_args_list[-1].kwargs["url"], "/product") - @patch("dojo.notifications.helper.process_notifications") + @patch("dojo.notifications.helper.NotificationManager._process_notifications") def test_engagements(self, mock): last_count = mock.call_count @@ -300,7 +308,7 @@ def test_engagements(self, mock): self.assertEqual(mock.call_args_list[-1].kwargs["description"], 'The engagement "Testing engagement" was deleted by admin') self.assertEqual(mock.call_args_list[-1].kwargs["url"], f"/product/{prod2.id}") - @patch("dojo.notifications.helper.process_notifications") + @patch("dojo.notifications.helper.NotificationManager._process_notifications") def test_endpoints(self, mock): prod_type = Product_Type.objects.first() prod1, _ = Product.objects.get_or_create(prod_type=prod_type, name="prod name 1") @@ -323,7 +331,7 @@ def test_endpoints(self, mock): self.assertEqual(mock.call_args_list[-1].kwargs["description"], 'The endpoint "host2" was deleted by admin') self.assertEqual(mock.call_args_list[-1].kwargs["url"], "/endpoint") - @patch("dojo.notifications.helper.process_notifications") + @patch("dojo.notifications.helper.NotificationManager._process_notifications") def test_tests(self, mock): prod_type = Product_Type.objects.first() prod, _ = Product.objects.get_or_create(prod_type=prod_type, name="prod name") @@ -347,7 +355,7 @@ def test_tests(self, mock): self.assertEqual(mock.call_args_list[-1].kwargs["description"], 'The test "Acunetix Scan" was deleted by admin') self.assertEqual(mock.call_args_list[-1].kwargs["url"], f"/engagement/{eng2.id}") - @patch("dojo.notifications.helper.process_notifications") + @patch("dojo.notifications.helper.NotificationManager._process_notifications") def test_finding_groups(self, mock): prod_type = Product_Type.objects.first() prod, _ = Product.objects.get_or_create(prod_type=prod_type, name="prod name") @@ -372,7 +380,7 @@ def test_finding_groups(self, mock): self.assertEqual(mock.call_args_list[-1].kwargs["description"], 'The finding group "fg test" was deleted by admin') self.assertEqual(mock.call_args_list[-1].kwargs["url"], f"/test/{test2.id}") - @patch("dojo.notifications.helper.process_notifications") + @patch("dojo.notifications.helper.NotificationManager._process_notifications") @override_settings(ENABLE_AUDITLOG=True) def test_auditlog_on(self, mock): prod_type = Product_Type.objects.create(name="notif prod type") @@ -380,7 +388,7 @@ def test_auditlog_on(self, mock): prod_type.delete() self.assertEqual(mock.call_args_list[-1].kwargs["description"], 'The product type "notif prod type" was deleted by admin') - @patch("dojo.notifications.helper.process_notifications") + @patch("dojo.notifications.helper.NotificationManager._process_notifications") @override_settings(ENABLE_AUDITLOG=False) def test_auditlog_off(self, mock): prod_type = Product_Type.objects.create(name="notif prod type") @@ -397,7 +405,7 @@ def setUp(self): self.client = APIClient() self.client.credentials(HTTP_AUTHORIZATION="Token " + token.key) - @patch("dojo.notifications.helper.process_notifications") + @patch("dojo.notifications.helper.NotificationManager._process_notifications") @override_settings(ENABLE_AUDITLOG=True) def test_auditlog_on(self, mock): prod_type = Product_Type.objects.create(name="notif prod type API") @@ -427,26 +435,30 @@ def test_missing_system_webhook(self): # test data contains 2 entries but we need to test missing definition Notification_Webhooks.objects.all().delete() with self.assertLogs("dojo.notifications.helper", level="INFO") as cm: - send_webhooks_notification(event="dummy") + manager = WebhookNotificationManger() + manager.send_webhooks_notification(event="dummy") self.assertIn("URLs for Webhooks not configured: skipping system notification", cm.output[0]) def test_missing_personal_webhook(self): # test data contains 2 entries but we need to test missing definition Notification_Webhooks.objects.all().delete() with self.assertLogs("dojo.notifications.helper", level="INFO") as cm: - send_webhooks_notification(event="dummy", user=Dojo_User.objects.get(username="admin")) + manager = WebhookNotificationManger() + manager.send_webhooks_notification(event="dummy", user=Dojo_User.objects.get(username="admin")) self.assertIn("URLs for Webhooks not configured for user '(admin)': skipping user notification", cm.output[0]) def test_system_webhook_inactive(self): self.sys_wh.status = Notification_Webhooks.Status.STATUS_INACTIVE_PERMANENT self.sys_wh.save() with self.assertLogs("dojo.notifications.helper", level="INFO") as cm: - send_webhooks_notification(event="dummy") + manager = WebhookNotificationManger() + manager.send_webhooks_notification(event="dummy") self.assertIn("URL for Webhook 'My webhook endpoint' is not active: Permanently inactive (inactive_permanent)", cm.output[0]) def test_system_webhook_sucessful(self): with self.assertLogs("dojo.notifications.helper", level="DEBUG") as cm: - send_webhooks_notification(event="dummy") + manager = WebhookNotificationManger() + manager.send_webhooks_notification(event="dummy") self.assertIn("Message sent to endpoint 'My webhook endpoint' successfully.", cm.output[-1]) updated_wh = Notification_Webhooks.objects.filter(owner=None).first() @@ -459,7 +471,8 @@ def test_system_webhook_4xx(self): self.sys_wh.save() with self.assertLogs("dojo.notifications.helper", level="ERROR") as cm: - send_webhooks_notification(event="dummy", title="Dummy event") + manager = WebhookNotificationManger() + manager.send_webhooks_notification(event="dummy", title="Dummy event") self.assertIn("Error when sending message to Webhooks 'My webhook endpoint' (status: 400)", cm.output[-1]) updated_wh = Notification_Webhooks.objects.all().filter(owner=None).first() @@ -472,7 +485,8 @@ def test_system_webhook_first_5xx(self): self.sys_wh.save() with self.assertLogs("dojo.notifications.helper", level="ERROR") as cm: - send_webhooks_notification(event="dummy", title="Dummy event") + manager = WebhookNotificationManger() + manager.send_webhooks_notification(event="dummy", title="Dummy event") updated_wh = Notification_Webhooks.objects.filter(owner=None).first() self.assertEqual(updated_wh.status, Notification_Webhooks.Status.STATUS_INACTIVE_TMP) @@ -490,7 +504,8 @@ def test_system_webhook_second_5xx_within_one_day(self): self.sys_wh.save() with self.assertLogs("dojo.notifications.helper", level="ERROR") as cm: - send_webhooks_notification(event="dummy", title="Dummy event") + manager = WebhookNotificationManger() + manager.send_webhooks_notification(event="dummy", title="Dummy event") updated_wh = Notification_Webhooks.objects.filter(owner=None).first() self.assertEqual(updated_wh.status, Notification_Webhooks.Status.STATUS_INACTIVE_TMP) @@ -510,7 +525,8 @@ def test_system_webhook_third_5xx_after_more_then_day(self): self.sys_wh.save() with self.assertLogs("dojo.notifications.helper", level="ERROR") as cm: - send_webhooks_notification(event="dummy", title="Dummy event") + manager = WebhookNotificationManger() + manager.send_webhooks_notification(event="dummy", title="Dummy event") updated_wh = Notification_Webhooks.objects.filter(owner=None).first() self.assertEqual(updated_wh.status, Notification_Webhooks.Status.STATUS_INACTIVE_PERMANENT) @@ -522,7 +538,8 @@ def test_system_webhook_third_5xx_after_more_then_day(self): def test_webhook_reactivation(self): with self.subTest("active"): wh = Notification_Webhooks.objects.filter(owner=None).first() - webhook_reactivation(endpoint_id=wh.pk) + manager = WebhookNotificationManger() + manager._webhook_reactivation(manager, endpoint_id=wh.pk) updated_wh = Notification_Webhooks.objects.filter(owner=None).first() self.assertEqual(updated_wh.status, Notification_Webhooks.Status.STATUS_ACTIVE) @@ -540,7 +557,8 @@ def test_webhook_reactivation(self): wh.save() with self.assertLogs("dojo.notifications.helper", level="DEBUG") as cm: - webhook_reactivation(endpoint_id=wh.pk) + manager = WebhookNotificationManger() + manager._webhook_reactivation(manager, endpoint_id=wh.pk) updated_wh = Notification_Webhooks.objects.filter(owner=None).first() self.assertEqual(updated_wh.status, Notification_Webhooks.Status.STATUS_ACTIVE_TMP) @@ -640,7 +658,8 @@ def test_system_webhook_timeout(self): system_settings.save() with self.assertLogs("dojo.notifications.helper", level="ERROR") as cm: - send_webhooks_notification(event="dummy", title="Dummy event") + manager = WebhookNotificationManger() + manager.send_webhooks_notification(event="dummy", title="Dummy event") updated_wh = Notification_Webhooks.objects.filter(owner=None).first() self.assertEqual(updated_wh.status, Notification_Webhooks.Status.STATUS_INACTIVE_TMP) @@ -655,7 +674,8 @@ def test_system_webhook_wrong_fqdn(self): self.sys_wh.save() with self.assertLogs("dojo.notifications.helper", level="ERROR") as cm: - send_webhooks_notification(event="dummy", title="Dummy event") + manager = WebhookNotificationManger() + manager.send_webhooks_notification(event="dummy", title="Dummy event") updated_wh = Notification_Webhooks.objects.filter(owner=None).first() self.assertEqual(updated_wh.status, Notification_Webhooks.Status.STATUS_INACTIVE_PERMANENT) @@ -751,7 +771,15 @@ def test_events_messages(self, mock): with self.subTest("test_added"): test = Test.objects.create(title="notif test", engagement=eng, target_start=timezone.now(), target_end=timezone.now(), test_type_id=Test_Type.objects.first().id) - notifications_helper.notify_test_created(test) + create_notification( + event="test_added", + title=f"Test created for {test.engagement.product}: {test.engagement.name}: {test}", + test=test, + engagement=test.engagement, + product=test.engagement.product, + url=reverse("view_test", args=(test.id,)), + url_api=reverse("test-detail", args=(test.id,)), + ) self.assertEqual(mock.call_args.kwargs["headers"]["X-DefectDojo-Event"], "test_added") self.maxDiff = None self.assertEqual(mock.call_args.kwargs["json"], { @@ -787,7 +815,10 @@ def test_events_messages(self, mock): }) with self.subTest("scan_added_empty"): - notifications_helper.notify_scan_added(test, updated_count=0) + BaseImporter( + environment=Development_Environment.objects.get_or_create(name="Development")[0], + scan_type="ZAP Scan", + ).notify_scan_added(test, updated_count=0) self.assertEqual(mock.call_args.kwargs["headers"]["X-DefectDojo-Event"], "scan_added_empty") self.maxDiff = None self.assertEqual(mock.call_args.kwargs["json"], { @@ -830,7 +861,11 @@ def test_events_messages(self, mock): }) with self.subTest("scan_added"): - notifications_helper.notify_scan_added(test, + BaseImporter( + environment=Development_Environment.objects.get_or_create(name="Development")[0], + scan_type="ZAP Scan", + ).notify_scan_added( + test, updated_count=4, new_findings=[ Finding.objects.create(test=test, title="New Finding", severity="Critical"), From e939ca195954cec84e97bfe158effaa4639ceeab Mon Sep 17 00:00:00 2001 From: manuelsommer <47991713+manuel-sommer@users.noreply.github.com> Date: Wed, 18 Dec 2024 05:08:42 +0100 Subject: [PATCH 3/6] fix typo in docs (#11387) --- .../archived_docs/integrations/social-authentication.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/content/en/open_source/archived_docs/integrations/social-authentication.md b/docs/content/en/open_source/archived_docs/integrations/social-authentication.md index db2a536f775..640beaeff18 100644 --- a/docs/content/en/open_source/archived_docs/integrations/social-authentication.md +++ b/docs/content/en/open_source/archived_docs/integrations/social-authentication.md @@ -213,7 +213,7 @@ This will ensure the user is added to all the groups found in the Azure AD Token The Azure AD token returned by Azure will also need to be configured to include group IDs. Without this step, the token will not contain any notion of a group, and the mapping process will report that the current user is not a member of any -groups. To update the the format of the token, add a group claim that applies to whatever group type you are using. +groups. To update the format of the token, add a group claim that applies to whatever group type you are using. If unsure of what type that is, select `All Groups`. Do not activate `Emit groups as role claims` within the Azure AD "Token configuration" page. From 7f7803a8e132959a5769c882a3c082212de77348 Mon Sep 17 00:00:00 2001 From: Cody Maffucci <46459665+Maffooch@users.noreply.github.com> Date: Thu, 19 Dec 2024 15:29:30 -0600 Subject: [PATCH 4/6] Qualys Hacker Guardian: Set Dedupe Config (#11442) --- dojo/settings/settings.dist.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dojo/settings/settings.dist.py b/dojo/settings/settings.dist.py index 483688dcd4e..df655d0190c 100644 --- a/dojo/settings/settings.dist.py +++ b/dojo/settings/settings.dist.py @@ -1290,6 +1290,7 @@ def saml2_attrib_map_format(dict): "HackerOne Cases": ["title", "severity"], "KrakenD Audit Scan": ["description", "mitigation", "severity"], "Red Hat Satellite": ["description", "severity"], + "Qualys Hacker Guardian Scan": ["title", "severity", "description"], } # Override the hardcoded settings here via the env var @@ -1535,6 +1536,7 @@ def saml2_attrib_map_format(dict): "KrakenD Audit Scan": DEDUPE_ALGO_HASH_CODE, "PTART Report": DEDUPE_ALGO_UNIQUE_ID_FROM_TOOL, "Red Hat Satellite": DEDUPE_ALGO_HASH_CODE, + "Qualys Hacker Guardian Scan": DEDUPE_ALGO_HASH_CODE, } # Override the hardcoded settings here via the env var From f2484142d4444519e45aa8b7f49d877777d8b0c0 Mon Sep 17 00:00:00 2001 From: Harold Blankenship <36673698+hblankenship@users.noreply.github.com> Date: Fri, 20 Dec 2024 16:24:07 -0600 Subject: [PATCH 5/6] Dedupe settings for Horusec Scan (#11418) --- dojo/settings/settings.dist.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/dojo/settings/settings.dist.py b/dojo/settings/settings.dist.py index df655d0190c..6022f1704cd 100644 --- a/dojo/settings/settings.dist.py +++ b/dojo/settings/settings.dist.py @@ -1211,6 +1211,7 @@ def saml2_attrib_map_format(dict): "Dependency Check Scan": ["title", "cwe", "file_path"], "Dockle Scan": ["title", "description", "vuln_id_from_tool"], "Dependency Track Finding Packaging Format (FPF) Export": ["component_name", "component_version", "vulnerability_ids"], + "Horusec Scan": ["title", "description", "file_path", "line"], "Mobsfscan Scan": ["title", "severity", "cwe", "file_path", "description"], "Tenable Scan": ["title", "severity", "vulnerability_ids", "cwe", "description"], "Nexpose Scan": ["title", "severity", "vulnerability_ids", "cwe"], @@ -1430,6 +1431,7 @@ def saml2_attrib_map_format(dict): "Cobalt.io API": DEDUPE_ALGO_UNIQUE_ID_FROM_TOOL, "Crunch42 Scan": DEDUPE_ALGO_UNIQUE_ID_FROM_TOOL, "Dependency Track Finding Packaging Format (FPF) Export": DEDUPE_ALGO_HASH_CODE, + "Horusec Scan": DEDUPE_ALGO_HASH_CODE, "Mobsfscan Scan": DEDUPE_ALGO_HASH_CODE, "SonarQube Scan detailed": DEDUPE_ALGO_UNIQUE_ID_FROM_TOOL, "SonarQube Scan": DEDUPE_ALGO_HASH_CODE, From e8c98f10889c59e3e95e5605fbfaf8c90076c18c Mon Sep 17 00:00:00 2001 From: DefectDojo release bot Date: Mon, 23 Dec 2024 15:44:13 +0000 Subject: [PATCH 6/6] Update versions in application files --- components/package.json | 2 +- dojo/__init__.py | 2 +- helm/defectdojo/Chart.yaml | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/components/package.json b/components/package.json index febe451775d..6ff5bff877e 100644 --- a/components/package.json +++ b/components/package.json @@ -1,6 +1,6 @@ { "name": "defectdojo", - "version": "2.42.0-dev", + "version": "2.41.3", "license" : "BSD-3-Clause", "private": true, "dependencies": { diff --git a/dojo/__init__.py b/dojo/__init__.py index 7edf826dd58..d6ab484dc20 100644 --- a/dojo/__init__.py +++ b/dojo/__init__.py @@ -4,6 +4,6 @@ # Django starts so that shared_task will use this app. from .celery import app as celery_app # noqa: F401 -__version__ = "2.41.2" +__version__ = "2.41.3" __url__ = "https://github.com/DefectDojo/django-DefectDojo" __docs__ = "https://documentation.defectdojo.com" diff --git a/helm/defectdojo/Chart.yaml b/helm/defectdojo/Chart.yaml index ab66f338320..4d63b24192b 100644 --- a/helm/defectdojo/Chart.yaml +++ b/helm/defectdojo/Chart.yaml @@ -1,8 +1,8 @@ apiVersion: v2 -appVersion: "2.42.0-dev" +appVersion: "2.41.3" description: A Helm chart for Kubernetes to install DefectDojo name: defectdojo -version: 1.6.165-dev +version: 1.6.165 icon: https://www.defectdojo.org/img/favicon.ico maintainers: - name: madchap