diff --git a/dojo/api_v2/serializers.py b/dojo/api_v2/serializers.py index 2880f738fb1..e52b1a3a04a 100644 --- a/dojo/api_v2/serializers.py +++ b/dojo/api_v2/serializers.py @@ -8,10 +8,9 @@ import tagulous from django.conf import settings from django.contrib.auth.models import Permission -from django.core.exceptions import PermissionDenied, ValidationError +from django.core.exceptions import ValidationError from django.db import transaction from django.db.utils import IntegrityError -from django.urls import reverse from django.utils import timezone from django.utils.translation import gettext_lazy as _ from drf_spectacular.utils import extend_schema_field @@ -19,8 +18,6 @@ from rest_framework.exceptions import NotFound from rest_framework.exceptions import ValidationError as RestFrameworkValidationError -import dojo.risk_acceptance.helper as ra_helper -from dojo.finding.queries import get_authorized_findings from dojo.importers.auto_create_context import AutoCreateContextManager from dojo.importers.base_importer import BaseImporter from dojo.importers.default_importer import DefaultImporter @@ -43,13 +40,10 @@ Language_Type, Languages, Network_Locations, - Note_Type, - NoteHistory, Notes, Product, Product_API_Scan_Configuration, Regulation, - Risk_Acceptance, SLA_Configuration, Sonarqube_Issue, Sonarqube_Issue_Transition, @@ -291,92 +285,20 @@ def validate(self, data): return data -from dojo.user.api.serializer import ( # noqa: E402, F401 -- backward compat + prefetcher discovery - AddUserSerializer, - UserContactInfoSerializer, - UserProfileSerializer, - UserSerializer, - UserStubSerializer, -) - - -class NoteTypeSerializer(serializers.ModelSerializer): - class Meta: - model = Note_Type - fields = "__all__" - - -class NoteHistorySerializer(serializers.ModelSerializer): - current_editor = UserStubSerializer(read_only=True) - note_type = NoteTypeSerializer(read_only=True, many=False) - - class Meta: - model = NoteHistory - fields = "__all__" - - -class NoteSerializer(serializers.ModelSerializer): - author = UserStubSerializer(many=False, read_only=True) - editor = UserStubSerializer(read_only=True, many=False, allow_null=True) - history = NoteHistorySerializer(read_only=True, many=True) - note_type = NoteTypeSerializer(read_only=True, many=False) - - def update(self, instance, validated_data): - instance.entry = validated_data.get("entry") - instance.edited = True - instance.editor = self.context["request"].user - instance.edit_time = timezone.now() - history = NoteHistory( - data=instance.entry, - time=instance.edit_time, - current_editor=instance.editor, - ) - history.save() - instance.history.add(history) - instance.save() - return instance - - class Meta: - model = Notes - fields = "__all__" - - -class FileSerializer(serializers.ModelSerializer): - file = serializers.FileField(required=True) - - class Meta: - model = FileUpload - fields = "__all__" - - def validate(self, data): - if file := data.get("file"): - # the clean will validate the file extensions and raise a Validation error if the extensions are not accepted - FileUpload(title=file.name, file=file).clean() - return data - return None - - -class RawFileSerializer(serializers.ModelSerializer): - file = serializers.FileField(required=True) - - class Meta: - model = FileUpload - fields = ["file"] - - -class RiskAcceptanceProofSerializer(serializers.ModelSerializer): - path = serializers.FileField(required=True) - - class Meta: - model = Risk_Acceptance - fields = ["path"] - - # Engagement serializers live in dojo/engagement/api/serializer.py. # EngagementSerializer is re-exported here because ReportGenerateSerializer and # RiskAcceptanceSerializer (below) still reference it. The other engagement # serializers are imported directly from dojo.engagement.api by their consumers. from dojo.engagement.api.serializer import EngagementSerializer # noqa: E402 -- backward compat +from dojo.file_uploads.api.serializer import ( # noqa: E402, F401 -- re-export; prefetcher + lazy consumers in finding/test/engagement + FileSerializer, + RawFileSerializer, +) +from dojo.note_type.api.serializer import NoteTypeSerializer # noqa: E402, F401 -- re-export for prefetcher discovery +from dojo.notes.api.serializer import ( # noqa: E402, F401 -- re-export; prefetcher + RiskAcceptanceToNotesSerializer + lazy consumers + NoteHistorySerializer, + NoteSerializer, +) # Product serializers live in dojo/product/api/serializer.py. ProductSerializer is # re-exported because ReportGenerateSerializer (below) still references it; @@ -388,13 +310,13 @@ class Meta: ProductSerializer, ) from dojo.product_type.api.serializer import ProductTypeSerializer # noqa: E402 - - -class RiskAcceptanceToNotesSerializer(serializers.Serializer): - risk_acceptance_id = serializers.PrimaryKeyRelatedField( - queryset=Risk_Acceptance.objects.all(), many=False, allow_null=True, - ) - notes = NoteSerializer(many=True) +from dojo.user.api.serializer import ( # noqa: E402, F401 -- backward compat + prefetcher discovery + AddUserSerializer, + UserContactInfoSerializer, + UserProfileSerializer, + UserSerializer, + UserStubSerializer, +) class AppAnalysisSerializer(serializers.ModelSerializer): @@ -446,118 +368,17 @@ class Meta: fields = "__all__" +# Risk acceptance serializers live in dojo/risk_acceptance/api/serializer.py. Re-exported here +# for backward compat: RiskAcceptanceSerializer is lazy-imported by dojo/finding/api/serializer.py +# (schema overrides); the ModelSerializers must also stay discoverable by the prefetcher. +from dojo.risk_acceptance.api.serializer import ( # noqa: E402 -- backward compat / prefetcher discovery + RiskAcceptanceProofSerializer, # noqa: F401 + RiskAcceptanceSerializer, # noqa: F401 -- lazy-imported by finding schema overrides + prefetcher + RiskAcceptanceToNotesSerializer, # noqa: F401 +) from dojo.test.api.serializer import TestSerializer # noqa: E402 -- backward compat re-export -class RiskAcceptanceSerializer(serializers.ModelSerializer): - path = serializers.SerializerMethodField() - - def create(self, validated_data): - instance = super().create(validated_data) - user = getattr(self.context.get("request", None), "user", None) - ra_helper.add_findings_to_risk_acceptance(user, instance, instance.accepted_findings.all()) - - # Add risk acceptance to engagement - # This is fine as Pro has its own model + relationshop to track links with engagements. - if instance.accepted_findings.exists(): - engagement = instance.accepted_findings.first().test.engagement - engagement.risk_acceptance.add(instance) - - return instance - - def update(self, instance, validated_data): - # Determine findings to risk accept, and findings to unaccept risk - existing_findings = Finding.objects.filter(risk_acceptance=self.instance.id) - new_findings_ids = [x.id for x in validated_data.get("accepted_findings", [])] - new_findings = Finding.objects.filter(id__in=new_findings_ids) - findings_to_add = set(new_findings) - set(existing_findings) - findings_to_remove = set(existing_findings) - set(new_findings) - findings_to_add = Finding.objects.filter(id__in=[x.id for x in findings_to_add]) - findings_to_remove = Finding.objects.filter(id__in=[x.id for x in findings_to_remove]) - # Make the update in the database - instance = super().update(instance, validated_data) - user = getattr(self.context.get("request", None), "user", None) - # Add the new findings - ra_helper.add_findings_to_risk_acceptance(user, instance, findings_to_add) - # Remove the ones that were not present in the payload - for finding in findings_to_remove: - ra_helper.remove_finding_from_risk_acceptance(user, instance, finding) - - # Handle orphaned risk acceptances: link to engagement if it now has findings - # This is fine as Pro has its own model + relationshop to track links with engagements. - if instance.accepted_findings.exists() and not instance.engagement: - engagement = instance.accepted_findings.first().test.engagement - engagement.risk_acceptance.add(instance) - - return instance - - @extend_schema_field(serializers.CharField()) - def get_path(self, obj): - engagement = Engagement.objects.filter( - risk_acceptance__id__in=[obj.id], - ).first() - path = "No proof has been supplied" - if engagement and obj.filename() is not None: - path = reverse( - "download_risk_acceptance", args=(engagement.id, obj.id), - ) - request = self.context.get("request") - if request: - path = request.build_absolute_uri(path) - return path - - @extend_schema_field(serializers.IntegerField()) - def get_engagement(self, obj): - engagement = Engagement.objects.filter( - risk_acceptance__id__in=[obj.id], - ).first() - return EngagementSerializer(read_only=True).to_representation( - engagement, - ) - - def validate(self, data): - def validate_findings_have_same_engagement(finding_objects: list[Finding]): - engagements = finding_objects.values_list("test__engagement__id", flat=True).distinct().count() - if engagements > 1: - msg = "You are not permitted to add findings from multiple engagements" - raise PermissionDenied(msg) - - findings = data.get("accepted_findings", []) - findings_ids = [x.id for x in findings] - finding_objects = Finding.objects.filter(id__in=findings_ids) - authed_findings = get_authorized_findings("edit").filter(id__in=findings_ids) - if len(findings) != len(authed_findings): - msg = "You are not permitted to add one or more selected findings to this risk acceptance" - raise PermissionDenied(msg) - if self.context["request"].method == "POST": - validate_findings_have_same_engagement(finding_objects) - - # Validate product allows full risk acceptance BEFORE creating instance - if finding_objects.exists(): - engagement = finding_objects.first().test.engagement - if not engagement.product.enable_full_risk_acceptance: - msg = "Full risk acceptance is not enabled for this product" - raise PermissionDenied(msg) - elif self.context["request"].method in {"PATCH", "PUT"}: - # Use the reverse relation instead of filtering - existing_findings = self.instance.accepted_findings.all() - existing_and_new_findings = existing_findings | finding_objects - validate_findings_have_same_engagement(existing_and_new_findings) - - # Explicit check to prevent engagement switching - risk_acceptance_engagement = self.instance.engagement - if risk_acceptance_engagement and finding_objects.exists(): - new_findings_engagement = finding_objects.first().test.engagement - if risk_acceptance_engagement.id != new_findings_engagement.id: - msg = f"Risk Acceptance belongs to engagement {risk_acceptance_engagement.id}. Cannot add findings from engagement {new_findings_engagement.id}" - raise ValidationError(msg) - return data - - class Meta: - model = Risk_Acceptance - fields = "__all__" - - class CommonImportScanSerializer(serializers.Serializer): scan_date = serializers.DateField( required=False, diff --git a/dojo/api_v2/views.py b/dojo/api_v2/views.py index 97816b34f6c..d4b763ea167 100644 --- a/dojo/api_v2/views.py +++ b/dojo/api_v2/views.py @@ -1,7 +1,5 @@ import logging -import mimetypes from datetime import datetime -from pathlib import Path import pghistory from dateutil.relativedelta import relativedelta @@ -10,8 +8,6 @@ from django.core.exceptions import ValidationError from django.db import IntegrityError from django.db.models.query import QuerySet as DjangoQuerySet -from django.http import FileResponse -from django.urls import reverse from django.utils import timezone from django_filters.rest_framework import DjangoFilterBackend from drf_spectacular.renderers import OpenApiJsonRenderer2 @@ -41,7 +37,6 @@ from dojo.filters import ( ApiAppAnalysisFilter, ApiDojoMetaFilter, - ApiRiskAcceptanceFilter, ) from dojo.finding.ui.filters import ( ReportFindingFilter, @@ -61,12 +56,8 @@ Language_Type, Languages, Network_Locations, - Note_Type, - NoteHistory, - Notes, Product, Regulation, - Risk_Acceptance, SLA_Configuration, Sonarqube_Issue, Sonarqube_Issue_Transition, @@ -79,12 +70,10 @@ get_authorized_languages, get_authorized_products, ) -from dojo.reports.views import ( +from dojo.reports.ui.views import ( prefetch_related_findings_for_report, report_url_resolver, ) -from dojo.risk_acceptance.helper import remove_finding_from_risk_acceptance -from dojo.risk_acceptance.queries import get_authorized_risk_acceptances from dojo.test.queries import get_authorized_tests from dojo.user.utils import get_configuration_permissions_codenames from dojo.utils import ( @@ -92,7 +81,6 @@ get_celery_queue_length, get_celery_worker_status, get_system_setting, - process_tag_notifications, purge_celery_queue, purge_celery_queue_by_task_name, ) @@ -171,128 +159,6 @@ def finalize_response(self, request, response, *args, **kwargs): # @extend_schema_view(**schema_with_prefetch()) # Nested models with prefetch make the response schema too long for Swagger UI -class RiskAcceptanceViewSet( - PrefetchDojoModelViewSet, -): - serializer_class = serializers.RiskAcceptanceSerializer - queryset = Risk_Acceptance.objects.none() - filter_backends = (DjangoFilterBackend,) - filterset_class = ApiRiskAcceptanceFilter - - permission_classes = ( - IsAuthenticated, - permissions.UserHasRiskAcceptancePermission, - ) - - def destroy(self, request, pk=None): - instance = self.get_object() - # Remove any findings on the risk acceptance - for finding in instance.accepted_findings.all(): - remove_finding_from_risk_acceptance(request.user, instance, finding) - # return the response of the object being deleted - return super().destroy(request, pk=pk) - - def get_queryset(self): - return ( - get_authorized_risk_acceptances("edit") - .prefetch_related( - "notes", "engagement_set", "owner", "accepted_findings", - ) - .distinct() - ) - - @extend_schema( - methods=["GET"], - responses={ - status.HTTP_200_OK: serializers.RiskAcceptanceToNotesSerializer, - }, - ) - @extend_schema( - methods=["POST"], - request=serializers.AddNewNoteOptionSerializer, - responses={status.HTTP_201_CREATED: serializers.NoteSerializer}, - ) - @action(detail=True, methods=["get", "post"], permission_classes=(IsAuthenticated, permissions.UserHasRiskAcceptanceRelatedObjectPermission)) - def notes(self, request, pk=None): - risk_acceptance = self.get_object() - if request.method == "POST": - new_note = serializers.AddNewNoteOptionSerializer(data=request.data) - if new_note.is_valid(): - entry = new_note.validated_data["entry"] - private = new_note.validated_data.get("private", False) - note_type = new_note.validated_data.get("note_type", None) - else: - return Response(new_note.errors, status=status.HTTP_400_BAD_REQUEST) - - notes = risk_acceptance.notes.filter(note_type=note_type).first() - if notes and note_type and note_type.is_single: - return Response("Only one instance of this note_type allowed on a risk acceptance.", status=status.HTTP_400_BAD_REQUEST) - - author = request.user - note = Notes(entry=entry, author=author, private=private, note_type=note_type) - note.save() - history = NoteHistory.objects.create(data=note.entry, time=note.date, current_editor=note.author) - note.history.add(history) - risk_acceptance.notes.add(note) - engagement = risk_acceptance.engagement - if engagement: - process_tag_notifications( - request=request, - note=note, - parent_url=request.build_absolute_uri( - reverse("view_risk_acceptance", args=(engagement.id, risk_acceptance.id)), - ), - parent_title=f"Risk Acceptance: {risk_acceptance.name}", - ) - - serialized_note = serializers.NoteSerializer( - {"author": author, "entry": entry, "private": private}, - ) - return Response(serialized_note.data, status=status.HTTP_201_CREATED) - - notes = risk_acceptance.notes.all() - serialized_notes = serializers.RiskAcceptanceToNotesSerializer( - {"risk_acceptance_id": risk_acceptance, "notes": notes}, - ) - return Response(serialized_notes.data, status=status.HTTP_200_OK) - - @extend_schema( - methods=["GET"], - responses={ - status.HTTP_200_OK: serializers.RiskAcceptanceProofSerializer, - }, - ) - @action(detail=True, methods=["get"], permission_classes=(IsAuthenticated, permissions.UserHasRiskAcceptanceRelatedObjectPermission)) - def download_proof(self, request, pk=None): - risk_acceptance = self.get_object() - # Get the file object - file_object = risk_acceptance.path - if file_object is None or risk_acceptance.filename() is None: - return Response( - {"error": "Proof has not provided to this risk acceptance..."}, - status=status.HTTP_404_NOT_FOUND, - ) - # Get the path of the file in media root - file_path = Path(settings.MEDIA_ROOT) / file_object.name - # NOTE: FileResponse takes ownership of closing the file handle when the response is closed. - # Explicitly register the closer to avoid potential resource leaks and satisfy static analyzers. - file_handle = file_path.open("rb") - # send file - response = FileResponse( - file_handle, - content_type=mimetypes.guess_type(str(file_path))[0] or "application/octet-stream", - status=status.HTTP_200_OK, - ) - if hasattr(response, "_resource_closers"): - response._resource_closers.append(file_handle.close) - response["Content-Length"] = file_object.size - response[ - "Content-Disposition" - ] = f'attachment; filename="{risk_acceptance.filename()}"' - - return response - - # These are technologies in the UI and the API! # Authorization: object-based @extend_schema_view(**schema_with_prefetch()) @@ -677,49 +543,8 @@ def perform_create(self, serializer): pghistory.context(test_id=test_id_from_response) -# Authorization: configuration -class NoteTypeViewSet( - DojoModelViewSet, -): - serializer_class = serializers.NoteTypeSerializer - queryset = Note_Type.objects.none() - filter_backends = (DjangoFilterBackend,) - filterset_fields = [ - "id", - "name", - "description", - "is_single", - "is_active", - "is_mandatory", - ] - permission_classes = (permissions.UserHasConfigurationPermissionSuperuser,) - - def get_queryset(self): - return Note_Type.objects.all().order_by("id") - - -# Authorization: superuser -class NotesViewSet( - mixins.UpdateModelMixin, - viewsets.ReadOnlyModelViewSet, -): - serializer_class = serializers.NoteSerializer - queryset = Notes.objects.none() - filter_backends = (DjangoFilterBackend,) - filterset_fields = [ - "id", - "entry", - "author", - "private", - "date", - "edited", - "edit_time", - "editor", - ] - permission_classes = (permissions.IsSuperUser, DjangoModelPermissions) - - def get_queryset(self): - return Notes.objects.all().order_by("id") +from dojo.note_type.api.views import NoteTypeViewSet # noqa: E402, F401 -- re-export; urls.py imports by name +from dojo.notes.api.views import NotesViewSet # noqa: E402, F401 -- re-export; urls.py imports by name def report_generate(request, obj, options): diff --git a/dojo/endpoint/ui/views.py b/dojo/endpoint/ui/views.py index 531f21cffd5..9e11a855118 100644 --- a/dojo/endpoint/ui/views.py +++ b/dojo/endpoint/ui/views.py @@ -29,7 +29,7 @@ ) from dojo.models import DojoMeta, Endpoint, Endpoint_Status, Finding, Product from dojo.query_utils import build_count_subquery -from dojo.reports.views import generate_report +from dojo.reports.ui.views import generate_report from dojo.utils import ( Product_Tab, add_breadcrumb, diff --git a/dojo/file_uploads/__init__.py b/dojo/file_uploads/__init__.py index e69de29bb2d..4134e5e9d54 100644 --- a/dojo/file_uploads/__init__.py +++ b/dojo/file_uploads/__init__.py @@ -0,0 +1 @@ +import dojo.file_uploads.admin # noqa: F401 diff --git a/dojo/file_uploads/admin.py b/dojo/file_uploads/admin.py new file mode 100644 index 00000000000..0add1a26c56 --- /dev/null +++ b/dojo/file_uploads/admin.py @@ -0,0 +1,6 @@ +from django.contrib import admin + +from dojo.file_uploads.models import FileAccessToken, FileUpload + +admin.site.register(FileUpload) +admin.site.register(FileAccessToken) diff --git a/dojo/file_uploads/api/__init__.py b/dojo/file_uploads/api/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dojo/file_uploads/api/serializer.py b/dojo/file_uploads/api/serializer.py new file mode 100644 index 00000000000..3f813dd7beb --- /dev/null +++ b/dojo/file_uploads/api/serializer.py @@ -0,0 +1,26 @@ +from rest_framework import serializers + +from dojo.file_uploads.models import FileUpload + + +class FileSerializer(serializers.ModelSerializer): + file = serializers.FileField(required=True) + + class Meta: + model = FileUpload + fields = "__all__" + + def validate(self, data): + if file := data.get("file"): + # the clean will validate the file extensions and raise a Validation error if the extensions are not accepted + FileUpload(title=file.name, file=file).clean() + return data + return None + + +class RawFileSerializer(serializers.ModelSerializer): + file = serializers.FileField(required=True) + + class Meta: + model = FileUpload + fields = ["file"] diff --git a/dojo/file_uploads/models.py b/dojo/file_uploads/models.py new file mode 100644 index 00000000000..e5fa8e5a66f --- /dev/null +++ b/dojo/file_uploads/models.py @@ -0,0 +1,103 @@ +from pathlib import Path +from uuid import uuid4 + +from django.conf import settings +from django.core.exceptions import ValidationError +from django.core.files.base import ContentFile +from django.db import models +from django.utils.translation import gettext_lazy as _ + +from dojo.models import ( # UniqueUploadNameProvider kept in dojo.models for migration upload_to path stability + UniqueUploadNameProvider, + copy_model_util, +) + + +class FileUpload(models.Model): + title = models.CharField(max_length=100, unique=True) + file = models.FileField(upload_to=UniqueUploadNameProvider("uploaded_files")) + + def delete(self, *args, **kwargs): + """Delete the model and remove the file from storage.""" + storage = self.file.storage + path = self.file.path + super().delete(*args, **kwargs) + if path and storage.exists(path): + storage.delete(path) + + def copy(self): + copy = copy_model_util(self) + # Add unique modifier to file name + # Truncate title to ensure it doesn't exceed max_length (100) when appending suffix + # Suffix " - clone-{8 chars}" is 17 characters, so truncate to 83 chars + clone_suffix = f" - clone-{str(uuid4())[:8]}" + max_title_length = 100 - len(clone_suffix) + truncated_title = self.title[:max_title_length] if len(self.title) > max_title_length else self.title + copy.title = f"{truncated_title}{clone_suffix}" + # Create new unique file name + current_url = self.file.url + _, current_full_filename = current_url.rsplit("/", 1) + _, extension = current_full_filename.split(".", 1) + new_file = ContentFile(self.file.read(), name=f"{uuid4()}.{extension}") + copy.file = new_file + copy.save() + + return copy + + def get_accessible_url(self, obj, obj_id): + from dojo.engagement.models import Engagement # noqa: PLC0415 -- lazy import, avoids circular dependency + from dojo.finding.models import Finding # noqa: PLC0415 -- lazy import, avoids circular dependency + from dojo.test.models import Test # noqa: PLC0415 -- lazy import, avoids circular dependency + if isinstance(obj, Engagement): + obj_type = "Engagement" + elif isinstance(obj, Test): + obj_type = "Test" + elif isinstance(obj, Finding): + obj_type = "Finding" + + return f"access_file/{self.id}/{obj_id}/{obj_type}" + + def clean(self): + if not self.title: + self.title = "" + + valid_extensions = settings.FILE_UPLOAD_TYPES + + # why does this not work with self.file.... + file_name = self.file.url if self.file else self.title + if Path(file_name).suffix.lower() not in valid_extensions: + if accepted_extensions := f"{', '.join(valid_extensions)}": + msg = ( + _("Unsupported extension. Supported extensions are as follows: %s") % accepted_extensions + ) + else: + msg = ( + _("File uploads are prohibited due to the list of acceptable file extensions being empty") + ) + raise ValidationError(msg) + + +class FileAccessToken(models.Model): + + """ + This will allow reports to request the images without exposing the + media root to the world without + authentication + """ + + user = models.ForeignKey("dojo.Dojo_User", null=False, blank=False, on_delete=models.CASCADE) + file = models.ForeignKey("dojo.FileUpload", null=False, blank=False, on_delete=models.CASCADE) + token = models.CharField(max_length=255) + size = models.CharField(max_length=9, + choices=( + ("small", "Small"), + ("medium", "Medium"), + ("large", "Large"), + ("thumbnail", "Thumbnail"), + ("original", "Original")), + default="medium") + + def save(self, *args, **kwargs): + if not self.token: + self.token = uuid4() + return super().save(*args, **kwargs) diff --git a/dojo/filters.py b/dojo/filters.py index b6ff6815f6f..b69f2722977 100644 --- a/dojo/filters.py +++ b/dojo/filters.py @@ -51,7 +51,6 @@ Note_Type, Product, Product_Type, - Risk_Acceptance, Test, Vulnerability_Id, ) @@ -1273,41 +1272,6 @@ class Meta: exclude = ["last_modified", "endpoint", "finding"] -class ApiRiskAcceptanceFilter(DojoFilter): - created = DateRangeFilter() - updated = DateRangeFilter() - - o = OrderingFilter( - # tuple-mapping retains order - fields=( - ("name", "name"), - ("created", "created"), - ("updated", "updated"), - ), - ) - - class Meta: - model = Risk_Acceptance - fields = { - "name": ["exact", "icontains"], - "accepted_findings": ["exact"], - "recommendation": ["exact"], - "recommendation_details": ["exact", "icontains"], - "decision": ["exact"], - "decision_details": ["exact", "icontains"], - "accepted_by": ["exact", "icontains"], - "owner": ["exact"], - "expiration_date": ["exact", "gt", "lt", "gte", "lte"], - "expiration_date_warned": ["exact", "gt", "lt", "gte", "lte"], - "expiration_date_handled": ["exact", "gt", "lt", "gte", "lte"], - "reactivate_expired": ["exact"], - "restart_sla_expired": ["exact"], - "notes": ["exact"], - "created": ["exact", "gt", "lt", "gte", "lte"], - "updated": ["exact", "gt", "lt", "gte", "lte"], - } - - class ApiAppAnalysisFilter(DojoFilter): tag = CharFilter(field_name="tags__name", lookup_expr="icontains", help_text="Tag name contains") tags = CharFieldInFilter( diff --git a/dojo/forms.py b/dojo/forms.py index 999c9bf16d8..0a349d0c481 100644 --- a/dojo/forms.py +++ b/dojo/forms.py @@ -19,7 +19,6 @@ from tagulous.forms import TagField from dojo.endpoint.utils import validate_endpoints_to_add -from dojo.finding.queries import get_authorized_findings from dojo.github.ui.forms import ( # noqa: F401 -- backward compat DeleteGITHUBConfForm, ExpressGITHUBForm, @@ -54,15 +53,11 @@ DojoMeta, Endpoint, FileUpload, - Finding, Finding_Group, - Note_Type, - Notes, Objects_Product, Product_API_Scan_Configuration, Product_Type, Regulation, - Risk_Acceptance, SLA_Configuration, Test_Type, User, @@ -72,7 +67,6 @@ from dojo.user.utils import get_configuration_permissions_fields from dojo.utils import ( get_password_requirements_string, - get_system_setting, is_finding_groups_enabled, is_scan_file_too_large, ) @@ -221,36 +215,11 @@ def __init__(self, *args, user=None, **kwargs): ) -class NoteTypeForm(forms.ModelForm): - description = forms.CharField(widget=forms.Textarea(attrs={}), - required=True) - - class Meta: - model = Note_Type - fields = ["name", "description", "is_single", "is_mandatory"] - - -class EditNoteTypeForm(NoteTypeForm): - - def __init__(self, *args, **kwargs): - is_single = kwargs.pop("is_single") - super().__init__(*args, **kwargs) - if is_single is False: - self.fields["is_single"].widget = forms.HiddenInput() - - -class DisableOrEnableNoteTypeForm(NoteTypeForm): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.fields["name"].disabled = True - self.fields["description"].disabled = True - self.fields["is_single"].disabled = True - self.fields["is_mandatory"].disabled = True - self.fields["is_active"].disabled = True - - class Meta: - model = Note_Type - fields = "__all__" +from dojo.note_type.ui.forms import ( # noqa: E402, F401 -- backward compat + DisableOrEnableNoteTypeForm, + EditNoteTypeForm, + NoteTypeForm, +) class DojoMetaDataForm(forms.ModelForm): @@ -578,64 +547,6 @@ def clean(self): raise ValidationError(msg) -class EditRiskAcceptanceForm(forms.ModelForm): - # unfortunately django forces us to repeat many things here. choices, default, required etc. - recommendation = forms.ChoiceField(choices=Risk_Acceptance.TREATMENT_CHOICES, initial=Risk_Acceptance.TREATMENT_ACCEPT, widget=forms.RadioSelect, label="Security Recommendation") - decision = forms.ChoiceField(choices=Risk_Acceptance.TREATMENT_CHOICES, initial=Risk_Acceptance.TREATMENT_ACCEPT, widget=forms.RadioSelect) - - path = forms.FileField(label="Proof", required=False, widget=forms.widgets.FileInput(attrs={"accept": ", ".join(settings.FILE_IMPORT_TYPES)})) - expiration_date = forms.DateTimeField(required=False, widget=forms.TextInput(attrs={"class": "datepicker"})) - - class Meta: - model = Risk_Acceptance - exclude = ["accepted_findings", "notes"] - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.fields["path"].help_text = f"Existing proof uploaded: {self.instance.filename()}" if self.instance.filename() else "None" - self.fields["expiration_date_warned"].disabled = True - self.fields["expiration_date_handled"].disabled = True - - def clean_path(self): - if (data := self.cleaned_data.get("path")) is not None: - ext = Path(data.name).suffix # [0] returns path+filename - valid_extensions = settings.FILE_UPLOAD_TYPES - if ext.lower() not in valid_extensions: - if accepted_extensions := f"{', '.join(valid_extensions)}": - msg = f"Unsupported extension. Supported extensions are as follows: {accepted_extensions}" - else: - msg = "File uploads are prohibited due to the list of acceptable file extensions being empty" - raise ValidationError(msg) - return data - - -class RiskAcceptanceForm(EditRiskAcceptanceForm): - accepted_findings = forms.ModelMultipleChoiceField( - queryset=Finding.objects.none(), required=True, - widget=forms.widgets.SelectMultiple(attrs={"size": 10}), - help_text=("Active, verified findings listed, please select to add findings.")) - notes = forms.CharField(required=False, max_length=2400, - widget=forms.Textarea, - label="Notes") - - class Meta: - model = Risk_Acceptance - fields = "__all__" - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - expiration_delta_days = get_system_setting("risk_acceptance_form_default_days") - logger.debug("expiration_delta_days: %i", expiration_delta_days) - if expiration_delta_days > 0: - expiration_date = timezone.now().date() + relativedelta(days=expiration_delta_days) - # logger.debug('setting default expiration_date: %s', expiration_date) - self.fields["expiration_date"].initial = expiration_date - # self.fields['path'].help_text = 'Existing proof uploaded: %s' % self.instance.filename() if self.instance.filename() else 'None' - self.fields["accepted_findings"].queryset = get_authorized_findings("edit") - if disclaimer := get_system_setting("disclaimer_notes"): - self.disclaimer = disclaimer.strip() - - class BaseManageFileFormSet(forms.BaseModelFormSet): def clean(self): """Validate the IP/Mask combo is in CIDR format""" @@ -665,12 +576,13 @@ def clean(self): ManageFileFormSet = modelformset_factory(FileUpload, extra=3, max_num=10, fields=["title", "file"], can_delete=True, formset=BaseManageFileFormSet) -class ReplaceRiskAcceptanceProofForm(forms.ModelForm): - path = forms.FileField(label="Proof", required=True, widget=forms.widgets.FileInput(attrs={"accept": ".jpg,.png,.pdf"})) - - class Meta: - model = Risk_Acceptance - fields = ["path"] +# Risk acceptance forms live in dojo/risk_acceptance/ui/forms.py. Re-exported here for +# backward compat — engagement's UI views import them from dojo.forms. +from dojo.risk_acceptance.ui.forms import ( # noqa: E402, F401 -- backward compat + EditRiskAcceptanceForm, + ReplaceRiskAcceptanceProofForm, + RiskAcceptanceForm, +) class CheckForm(forms.ModelForm): @@ -715,44 +627,14 @@ class Meta: EngForm, ExistingEngagementForm, ) +from dojo.notes.ui.forms import ( # noqa: E402, F401 -- backward compat + DeleteNoteForm, + NoteForm, + TypedNoteForm, +) from dojo.test.ui.forms import TestForm # noqa: E402, F401 -- backward compat -class NoteForm(forms.ModelForm): - entry = forms.CharField(max_length=2400, widget=forms.Textarea(attrs={"rows": 4, "cols": 15}), - label="Notes:") - - class Meta: - model = Notes - fields = ["entry", "private"] - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - if disclaimer := get_system_setting("disclaimer_notes"): - self.disclaimer = disclaimer.strip() - - -class TypedNoteForm(NoteForm): - - def __init__(self, *args, **kwargs): - queryset = kwargs.pop("available_note_types") - super().__init__(*args, **kwargs) - self.fields["note_type"] = forms.ModelChoiceField(queryset=queryset, label="Note Type", required=True) - - class Meta: - model = Notes - fields = ["note_type", "entry", "private"] - - -class DeleteNoteForm(forms.ModelForm): - id = forms.IntegerField(required=True, - widget=forms.widgets.HiddenInput()) - - class Meta: - model = Notes - fields = ["id"] - - class WeeklyMetricsForm(forms.Form): dates = forms.ChoiceField() @@ -900,31 +782,6 @@ class Meta: "date_joined", "user_permissions"] -class ReportOptionsForm(forms.Form): - yes_no = (("0", "No"), ("1", "Yes")) - include_finding_notes = forms.ChoiceField(choices=yes_no, label="Finding Notes") - include_finding_images = forms.ChoiceField(choices=yes_no, label="Finding Images") - include_executive_summary = forms.ChoiceField(choices=yes_no, label="Executive Summary") - include_table_of_contents = forms.ChoiceField(choices=yes_no, label="Table of Contents") - include_disclaimer = forms.ChoiceField(choices=yes_no, label="Disclaimer") - report_type = forms.ChoiceField(choices=(("HTML", "HTML"),)) - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - if get_system_setting("disclaimer_reports_forced"): - self.fields["include_disclaimer"].disabled = True - self.fields["include_disclaimer"].initial = "1" # represents yes - self.fields["include_disclaimer"].help_text = "Administrator of the system enforced placement of disclaimer in all reports. You are not able exclude disclaimer from this report." - - -class CustomReportOptionsForm(forms.Form): - yes_no = (("0", "No"), ("1", "Yes")) - report_name = forms.CharField(required=False, max_length=100) - include_finding_notes = forms.ChoiceField(required=False, choices=yes_no) - include_finding_images = forms.ChoiceField(choices=yes_no, label="Finding Images") - report_type = forms.ChoiceField(choices=(("HTML", "HTML"),)) - - from dojo.benchmark.ui.forms import ( # noqa: E402, F401 -- backward compat Benchmark_Product_SummaryForm, Benchmark_RequirementForm, diff --git a/dojo/models.py b/dojo/models.py index 651bebad557..c5eb30b0d15 100644 --- a/dojo/models.py +++ b/dojo/models.py @@ -5,11 +5,9 @@ from uuid import uuid4 import tagulous.admin -from django.conf import settings from django.contrib import admin from django.contrib.auth import get_user_model from django.core.exceptions import ValidationError -from django.core.files.base import ContentFile from django.db import models from django.db.models import Count from django.db.models.expressions import Case, When @@ -170,130 +168,19 @@ def get_current_datetime(): return timezone.now() -class Note_Type(models.Model): - name = models.CharField(max_length=100, unique=True) - description = models.CharField(max_length=200) - is_single = models.BooleanField(default=False, null=False) - is_active = models.BooleanField(default=True, null=False) - is_mandatory = models.BooleanField(default=True, null=False) - - def __str__(self): - return self.name - - -class NoteHistory(models.Model): - note_type = models.ForeignKey(Note_Type, null=True, blank=True, on_delete=models.CASCADE) - data = models.TextField() - time = models.DateTimeField(null=True, editable=False, - default=get_current_datetime) - current_editor = models.ForeignKey(Dojo_User, editable=False, null=True, on_delete=models.CASCADE) - - def copy(self): - copy = copy_model_util(self) - copy.save() - return copy - - -class Notes(models.Model): - note_type = models.ForeignKey(Note_Type, related_name="note_type", null=True, blank=True, on_delete=models.CASCADE) - entry = models.TextField() - date = models.DateTimeField(null=False, editable=False, - default=get_current_datetime) - author = models.ForeignKey(Dojo_User, related_name="editor_notes_set", editable=False, on_delete=models.CASCADE) - private = models.BooleanField(default=False) - edited = models.BooleanField(default=False) - editor = models.ForeignKey(Dojo_User, related_name="author_notes_set", editable=False, null=True, on_delete=models.CASCADE) - edit_time = models.DateTimeField(null=True, editable=False, - default=get_current_datetime) - history = models.ManyToManyField(NoteHistory, blank=True, - editable=False) - - class Meta: - ordering = ["-date"] - - def __str__(self): - return self.entry - - def copy(self): - copy = copy_model_util(self) - # Save the necessary ManyToMany relationships - old_history = list(self.history.all()) - # Save the object before setting any ManyToMany relationships - copy.save() - # Copy the history - for history in old_history: - copy.history.add(history.copy()) - - return copy - - -class FileUpload(models.Model): - title = models.CharField(max_length=100, unique=True) - file = models.FileField(upload_to=UniqueUploadNameProvider("uploaded_files")) - - def delete(self, *args, **kwargs): - """Delete the model and remove the file from storage.""" - storage = self.file.storage - path = self.file.path - super().delete(*args, **kwargs) - if path and storage.exists(path): - storage.delete(path) - - def copy(self): - copy = copy_model_util(self) - # Add unique modifier to file name - # Truncate title to ensure it doesn't exceed max_length (100) when appending suffix - # Suffix " - clone-{8 chars}" is 17 characters, so truncate to 83 chars - clone_suffix = f" - clone-{str(uuid4())[:8]}" - max_title_length = 100 - len(clone_suffix) - truncated_title = self.title[:max_title_length] if len(self.title) > max_title_length else self.title - copy.title = f"{truncated_title}{clone_suffix}" - # Create new unique file name - current_url = self.file.url - _, current_full_filename = current_url.rsplit("/", 1) - _, extension = current_full_filename.split(".", 1) - new_file = ContentFile(self.file.read(), name=f"{uuid4()}.{extension}") - copy.file = new_file - copy.save() - - return copy - - def get_accessible_url(self, obj, obj_id): - if isinstance(obj, Engagement): - obj_type = "Engagement" - elif isinstance(obj, Test): - obj_type = "Test" - elif isinstance(obj, Finding): - obj_type = "Finding" - - return f"access_file/{self.id}/{obj_id}/{obj_type}" - - def clean(self): - if not self.title: - self.title = "" - - valid_extensions = settings.FILE_UPLOAD_TYPES - - # why does this not work with self.file.... - file_name = self.file.url if self.file else self.title - if Path(file_name).suffix.lower() not in valid_extensions: - if accepted_extensions := f"{', '.join(valid_extensions)}": - msg = ( - _("Unsupported extension. Supported extensions are as follows: %s") % accepted_extensions - ) - else: - msg = ( - _("File uploads are prohibited due to the list of acceptable file extensions being empty") - ) - raise ValidationError(msg) - - +from dojo.file_uploads.models import FileAccessToken, FileUpload # noqa: E402, F401 -- re-export +from dojo.note_type.models import Note_Type # noqa: E402, F401 -- re-export +from dojo.notes.models import ( # noqa: E402, F401 -- re-export; Notes used by Risk_Acceptance.notes M2M below + NoteHistory, + Notes, +) from dojo.product.models import ( # noqa: E402 -- re-export; class-body FKs below reference these Product, Product_API_Scan_Configuration, # noqa: F401 -- re-export Product_Line, # noqa: F401 -- re-export ) from dojo.product_type.models import Product_Type # noqa: E402, F401 -- re-export +from dojo.reports.models import Report_Type # noqa: E402, F401 -- re-export from dojo.test.models import ( # noqa: E402 -- re-export; class-body FKs below reference these IMPORT_ACTIONS, # noqa: F401 -- re-export IMPORT_CLOSED_FINDING, # noqa: F401 -- re-export @@ -307,10 +194,6 @@ def clean(self): ) -class Report_Type(models.Model): - name = models.CharField(max_length=255) - - class DojoMeta(models.Model): name = models.CharField(max_length=120) value = models.CharField(max_length=300) @@ -608,133 +491,7 @@ def get_breadcrumb(self): return bc -class Risk_Acceptance(models.Model): - TREATMENT_ACCEPT = "A" - TREATMENT_AVOID = "V" - TREATMENT_MITIGATE = "M" - TREATMENT_FIX = "F" - TREATMENT_TRANSFER = "T" - - TREATMENT_TRANSLATIONS = { - TREATMENT_ACCEPT: _("Accept (The risk is acknowledged, yet remains)"), - TREATMENT_AVOID: _("Avoid (Do not engage with whatever creates the risk)"), - TREATMENT_MITIGATE: _("Mitigate (The risk still exists, yet compensating controls make it less of a threat)"), - TREATMENT_FIX: _("Fix (The risk is eradicated)"), - TREATMENT_TRANSFER: _("Transfer (The risk is transferred to a 3rd party)"), - } - - TREATMENT_CHOICES = [ - (TREATMENT_ACCEPT, TREATMENT_TRANSLATIONS[TREATMENT_ACCEPT]), - (TREATMENT_AVOID, TREATMENT_TRANSLATIONS[TREATMENT_AVOID]), - (TREATMENT_MITIGATE, TREATMENT_TRANSLATIONS[TREATMENT_MITIGATE]), - (TREATMENT_FIX, TREATMENT_TRANSLATIONS[TREATMENT_FIX]), - (TREATMENT_TRANSFER, TREATMENT_TRANSLATIONS[TREATMENT_TRANSFER]), - ] - - name = models.CharField(max_length=300, null=False, blank=False, help_text=_("Descriptive name which in the future may also be used to group risk acceptances together across engagements and products")) - - accepted_findings = models.ManyToManyField(Finding) - - recommendation = models.CharField(choices=TREATMENT_CHOICES, max_length=2, null=False, default=TREATMENT_FIX, help_text=_("Recommendation from the security team."), verbose_name=_("Security Recommendation")) - - recommendation_details = models.TextField(null=True, - blank=True, - help_text=_("Explanation of security recommendation"), verbose_name=_("Security Recommendation Details")) - - decision = models.CharField(choices=TREATMENT_CHOICES, max_length=2, null=False, default=TREATMENT_ACCEPT, help_text=_("Risk treatment decision by risk owner")) - decision_details = models.TextField(default=None, blank=True, null=True, help_text=_("If a compensating control exists to mitigate the finding or reduce risk, then list the compensating control(s).")) - - accepted_by = models.CharField(max_length=200, default=None, null=True, blank=True, verbose_name=_("Accepted By"), help_text=_("The person that accepts the risk, can be outside of DefectDojo.")) - path = models.FileField(upload_to="risk/%Y/%m/%d", - editable=True, null=True, - blank=True, verbose_name=_("Proof")) - owner = models.ForeignKey(Dojo_User, editable=True, on_delete=models.RESTRICT, help_text=_("User in DefectDojo owning this acceptance. Only the owner and staff users can edit the risk acceptance.")) - - expiration_date = models.DateTimeField(default=None, null=True, blank=True, help_text=_("When the risk acceptance expires, the findings will be reactivated (unless disabled below).")) - expiration_date_warned = models.DateTimeField(default=None, null=True, blank=True, help_text=_("(readonly) Date at which notice about the risk acceptance expiration was sent.")) - expiration_date_handled = models.DateTimeField(default=None, null=True, blank=True, help_text=_("(readonly) When the risk acceptance expiration was handled (manually or by the daily job).")) - reactivate_expired = models.BooleanField(null=False, blank=False, default=True, verbose_name=_("Reactivate findings on expiration"), help_text=_("Reactivate findings when risk acceptance expires?")) - restart_sla_expired = models.BooleanField(default=False, null=False, verbose_name=_("Restart SLA on expiration"), help_text=_("When enabled, the SLA for findings is restarted when the risk acceptance expires.")) - - notes = models.ManyToManyField(Notes, editable=False) - created = models.DateTimeField(auto_now_add=True, null=False) - updated = models.DateTimeField(auto_now=True, editable=False) - - def __str__(self): - return str(self.name) - - def filename(self): - # logger.debug('path: "%s"', self.path) - if not self.path: - return None - return Path(self.path.name).name - - @property - def name_and_expiration_info(self): - return str(self.name) + (" (expired " if self.is_expired else " (expires ") + (timezone.localtime(self.expiration_date).strftime("%b %d, %Y") if self.expiration_date else "Never") + ")" - - def get_breadcrumbs(self): - bc = self.engagement_set.first().get_breadcrumbs() - bc += [{"title": str(self), - "url": reverse("view_risk_acceptance", args=( - self.engagement_set.first().product.id, self.id))}] - return bc - - @property - def is_expired(self): - return self.expiration_date_handled is not None - - # relationship is many to many, but we use it as one-to-many - @property - def engagement(self): - engs = self.engagement_set.all() - if engs: - return engs[0] - - return None - - def copy(self, engagement=None): - copy = copy_model_util(self) - # Save the necessary ManyToMany relationships - old_notes = list(self.notes.all()) - old_accepted_findings_hash_codes = [finding.hash_code for finding in self.accepted_findings.all()] - # Save the object before setting any ManyToMany relationships - copy.save() - # Copy the notes - for notes in old_notes: - copy.notes.add(notes.copy()) - # Assign any accepted findings - if engagement: - new_accepted_findings = Finding.objects.filter(test__engagement=engagement, hash_code__in=old_accepted_findings_hash_codes, risk_accepted=True).distinct() - copy.accepted_findings.set(new_accepted_findings) - return copy - - -class FileAccessToken(models.Model): - - """ - This will allow reports to request the images without exposing the - media root to the world without - authentication - """ - - user = models.ForeignKey(Dojo_User, null=False, blank=False, on_delete=models.CASCADE) - file = models.ForeignKey(FileUpload, null=False, blank=False, on_delete=models.CASCADE) - token = models.CharField(max_length=255) - size = models.CharField(max_length=9, - choices=( - ("small", "Small"), - ("medium", "Medium"), - ("large", "Large"), - ("thumbnail", "Thumbnail"), - ("original", "Original")), - default="medium") - - def save(self, *args, **kwargs): - if not self.token: - self.token = uuid4() - return super().save(*args, **kwargs) - +from dojo.risk_acceptance.models import Risk_Acceptance # noqa: E402, F401 -- re-export ANNOUNCEMENT_STYLE_CHOICES = ( ("info", "Info"), @@ -951,12 +708,10 @@ def __str__(self): admin.site.register(Languages) admin.site.register(Language_Type) admin.site.register(App_Analysis) -admin.site.register(FileUpload) -admin.site.register(FileAccessToken) -admin.site.register(Risk_Acceptance) +# FileUpload + FileAccessToken admin registered in dojo/file_uploads/admin.py admin.site.register(Check_List) -admin.site.register(Notes) -admin.site.register(Note_Type) +# Notes + NoteHistory admin registered in dojo/notes/admin.py +# Note_Type admin registered in dojo/note_type/admin.py admin.site.register(SLA_Configuration) admin.site.register(Regulation) from dojo.authorization.models import ( # noqa: E402 @@ -984,8 +739,8 @@ def __str__(self): admin.site.register(Product_Type_Member) admin.site.register(Product_Type_Group) -admin.site.register(NoteHistory) -admin.site.register(Report_Type) +# NoteHistory admin registered in dojo/notes/admin.py +# Report_Type admin registered in dojo/reports/admin.py admin.site.register(DojoMeta) admin.site.register(Development_Environment) admin.site.register(Announcement) diff --git a/dojo/note_type/__init__.py b/dojo/note_type/__init__.py index e69de29bb2d..2c7095d7b93 100644 --- a/dojo/note_type/__init__.py +++ b/dojo/note_type/__init__.py @@ -0,0 +1 @@ +import dojo.note_type.admin # noqa: F401 diff --git a/dojo/note_type/admin.py b/dojo/note_type/admin.py new file mode 100644 index 00000000000..6b1baab68bc --- /dev/null +++ b/dojo/note_type/admin.py @@ -0,0 +1,5 @@ +from django.contrib import admin + +from dojo.note_type.models import Note_Type + +admin.site.register(Note_Type) diff --git a/dojo/note_type/api/__init__.py b/dojo/note_type/api/__init__.py new file mode 100644 index 00000000000..e39da828cac --- /dev/null +++ b/dojo/note_type/api/__init__.py @@ -0,0 +1 @@ +path = "note_type" # noqa: RUF067 diff --git a/dojo/note_type/api/serializer.py b/dojo/note_type/api/serializer.py new file mode 100644 index 00000000000..459773cc38d --- /dev/null +++ b/dojo/note_type/api/serializer.py @@ -0,0 +1,9 @@ +from rest_framework import serializers + +from dojo.note_type.models import Note_Type + + +class NoteTypeSerializer(serializers.ModelSerializer): + class Meta: + model = Note_Type + fields = "__all__" diff --git a/dojo/note_type/api/urls.py b/dojo/note_type/api/urls.py new file mode 100644 index 00000000000..f7c5a878568 --- /dev/null +++ b/dojo/note_type/api/urls.py @@ -0,0 +1,7 @@ +from dojo.note_type.api import path +from dojo.note_type.api.views import NoteTypeViewSet + + +def add_note_type_urls(router): + router.register(path, NoteTypeViewSet, basename="note_type") + return router diff --git a/dojo/note_type/api/views.py b/dojo/note_type/api/views.py new file mode 100644 index 00000000000..cde7c4d862f --- /dev/null +++ b/dojo/note_type/api/views.py @@ -0,0 +1,27 @@ +from django_filters.rest_framework import DjangoFilterBackend + +from dojo.api_v2.views import DojoModelViewSet +from dojo.authorization import api_permissions as permissions +from dojo.note_type.api.serializer import NoteTypeSerializer +from dojo.note_type.models import Note_Type + + +# Authorization: configuration +class NoteTypeViewSet( + DojoModelViewSet, +): + serializer_class = NoteTypeSerializer + queryset = Note_Type.objects.none() + filter_backends = (DjangoFilterBackend,) + filterset_fields = [ + "id", + "name", + "description", + "is_single", + "is_active", + "is_mandatory", + ] + permission_classes = (permissions.UserHasConfigurationPermissionSuperuser,) + + def get_queryset(self): + return Note_Type.objects.all().order_by("id") diff --git a/dojo/note_type/models.py b/dojo/note_type/models.py new file mode 100644 index 00000000000..6ba489a8ca4 --- /dev/null +++ b/dojo/note_type/models.py @@ -0,0 +1,12 @@ +from django.db import models + + +class Note_Type(models.Model): + name = models.CharField(max_length=100, unique=True) + description = models.CharField(max_length=200) + is_single = models.BooleanField(default=False, null=False) + is_active = models.BooleanField(default=True, null=False) + is_mandatory = models.BooleanField(default=True, null=False) + + def __str__(self): + return self.name diff --git a/dojo/note_type/ui/__init__.py b/dojo/note_type/ui/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dojo/note_type/ui/forms.py b/dojo/note_type/ui/forms.py new file mode 100644 index 00000000000..8cbbef09020 --- /dev/null +++ b/dojo/note_type/ui/forms.py @@ -0,0 +1,35 @@ +from django import forms + +from dojo.note_type.models import Note_Type + + +class NoteTypeForm(forms.ModelForm): + description = forms.CharField(widget=forms.Textarea(attrs={}), + required=True) + + class Meta: + model = Note_Type + fields = ["name", "description", "is_single", "is_mandatory"] + + +class EditNoteTypeForm(NoteTypeForm): + + def __init__(self, *args, **kwargs): + is_single = kwargs.pop("is_single") + super().__init__(*args, **kwargs) + if is_single is False: + self.fields["is_single"].widget = forms.HiddenInput() + + +class DisableOrEnableNoteTypeForm(NoteTypeForm): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.fields["name"].disabled = True + self.fields["description"].disabled = True + self.fields["is_single"].disabled = True + self.fields["is_mandatory"].disabled = True + self.fields["is_active"].disabled = True + + class Meta: + model = Note_Type + fields = "__all__" diff --git a/dojo/note_type/urls.py b/dojo/note_type/ui/urls.py similarity index 93% rename from dojo/note_type/urls.py rename to dojo/note_type/ui/urls.py index 76e3c3a6a2c..4f422a5d502 100644 --- a/dojo/note_type/urls.py +++ b/dojo/note_type/ui/urls.py @@ -1,6 +1,6 @@ from django.urls import re_path -from dojo.note_type import views +from dojo.note_type.ui import views urlpatterns = [ re_path(r"^note_type$", diff --git a/dojo/note_type/views.py b/dojo/note_type/ui/views.py similarity index 96% rename from dojo/note_type/views.py rename to dojo/note_type/ui/views.py index 65c908c740e..27aa93f6958 100644 --- a/dojo/note_type/views.py +++ b/dojo/note_type/ui/views.py @@ -6,8 +6,8 @@ from django.urls import reverse from dojo.filters import NoteTypesFilter -from dojo.forms import DisableOrEnableNoteTypeForm, EditNoteTypeForm, NoteTypeForm -from dojo.models import Note_Type +from dojo.note_type.models import Note_Type +from dojo.note_type.ui.forms import DisableOrEnableNoteTypeForm, EditNoteTypeForm, NoteTypeForm from dojo.utils import add_breadcrumb, get_page_items logger = logging.getLogger(__name__) diff --git a/dojo/notes/__init__.py b/dojo/notes/__init__.py index e69de29bb2d..6871614a351 100644 --- a/dojo/notes/__init__.py +++ b/dojo/notes/__init__.py @@ -0,0 +1 @@ +import dojo.notes.admin # noqa: F401 diff --git a/dojo/notes/admin.py b/dojo/notes/admin.py new file mode 100644 index 00000000000..2c3ccf06f9c --- /dev/null +++ b/dojo/notes/admin.py @@ -0,0 +1,6 @@ +from django.contrib import admin + +from dojo.notes.models import NoteHistory, Notes + +admin.site.register(Notes) +admin.site.register(NoteHistory) diff --git a/dojo/notes/api/__init__.py b/dojo/notes/api/__init__.py new file mode 100644 index 00000000000..c042966fa4f --- /dev/null +++ b/dojo/notes/api/__init__.py @@ -0,0 +1 @@ +path = "notes" # noqa: RUF067 diff --git a/dojo/notes/api/serializer.py b/dojo/notes/api/serializer.py new file mode 100644 index 00000000000..e4bd3f83e5d --- /dev/null +++ b/dojo/notes/api/serializer.py @@ -0,0 +1,41 @@ +from django.utils import timezone +from rest_framework import serializers + +from dojo.note_type.api.serializer import NoteTypeSerializer +from dojo.notes.models import NoteHistory, Notes +from dojo.user.api.serializer import UserStubSerializer + + +class NoteHistorySerializer(serializers.ModelSerializer): + current_editor = UserStubSerializer(read_only=True) + note_type = NoteTypeSerializer(read_only=True, many=False) + + class Meta: + model = NoteHistory + fields = "__all__" + + +class NoteSerializer(serializers.ModelSerializer): + author = UserStubSerializer(many=False, read_only=True) + editor = UserStubSerializer(read_only=True, many=False, allow_null=True) + history = NoteHistorySerializer(read_only=True, many=True) + note_type = NoteTypeSerializer(read_only=True, many=False) + + def update(self, instance, validated_data): + instance.entry = validated_data.get("entry") + instance.edited = True + instance.editor = self.context["request"].user + instance.edit_time = timezone.now() + history = NoteHistory( + data=instance.entry, + time=instance.edit_time, + current_editor=instance.editor, + ) + history.save() + instance.history.add(history) + instance.save() + return instance + + class Meta: + model = Notes + fields = "__all__" diff --git a/dojo/notes/api/urls.py b/dojo/notes/api/urls.py new file mode 100644 index 00000000000..2d7d582551a --- /dev/null +++ b/dojo/notes/api/urls.py @@ -0,0 +1,7 @@ +from dojo.notes.api import path +from dojo.notes.api.views import NotesViewSet + + +def add_notes_urls(router): + router.register(path, NotesViewSet, basename="notes") + return router diff --git a/dojo/notes/api/views.py b/dojo/notes/api/views.py new file mode 100644 index 00000000000..29fe0e74030 --- /dev/null +++ b/dojo/notes/api/views.py @@ -0,0 +1,31 @@ +from django_filters.rest_framework import DjangoFilterBackend +from rest_framework import mixins, viewsets +from rest_framework.permissions import DjangoModelPermissions + +from dojo.authorization import api_permissions as permissions +from dojo.notes.api.serializer import NoteSerializer +from dojo.notes.models import Notes + + +# Authorization: superuser +class NotesViewSet( + mixins.UpdateModelMixin, + viewsets.ReadOnlyModelViewSet, +): + serializer_class = NoteSerializer + queryset = Notes.objects.none() + filter_backends = (DjangoFilterBackend,) + filterset_fields = [ + "id", + "entry", + "author", + "private", + "date", + "edited", + "edit_time", + "editor", + ] + permission_classes = (permissions.IsSuperUser, DjangoModelPermissions) + + def get_queryset(self): + return Notes.objects.all().order_by("id") diff --git a/dojo/notes/models.py b/dojo/notes/models.py new file mode 100644 index 00000000000..26df9ab7a7a --- /dev/null +++ b/dojo/notes/models.py @@ -0,0 +1,49 @@ +from django.db import models + +from dojo.models import copy_model_util, get_current_datetime + + +class NoteHistory(models.Model): + note_type = models.ForeignKey("dojo.Note_Type", null=True, blank=True, on_delete=models.CASCADE) + data = models.TextField() + time = models.DateTimeField(null=True, editable=False, + default=get_current_datetime) + current_editor = models.ForeignKey("dojo.Dojo_User", editable=False, null=True, on_delete=models.CASCADE) + + def copy(self): + copy = copy_model_util(self) + copy.save() + return copy + + +class Notes(models.Model): + note_type = models.ForeignKey("dojo.Note_Type", related_name="note_type", null=True, blank=True, on_delete=models.CASCADE) + entry = models.TextField() + date = models.DateTimeField(null=False, editable=False, + default=get_current_datetime) + author = models.ForeignKey("dojo.Dojo_User", related_name="editor_notes_set", editable=False, on_delete=models.CASCADE) + private = models.BooleanField(default=False) + edited = models.BooleanField(default=False) + editor = models.ForeignKey("dojo.Dojo_User", related_name="author_notes_set", editable=False, null=True, on_delete=models.CASCADE) + edit_time = models.DateTimeField(null=True, editable=False, + default=get_current_datetime) + history = models.ManyToManyField("dojo.NoteHistory", blank=True, + editable=False) + + class Meta: + ordering = ["-date"] + + def __str__(self): + return self.entry + + def copy(self): + copy = copy_model_util(self) + # Save the necessary ManyToMany relationships + old_history = list(self.history.all()) + # Save the object before setting any ManyToMany relationships + copy.save() + # Copy the history + for history in old_history: + copy.history.add(history.copy()) + + return copy diff --git a/dojo/notes/ui/__init__.py b/dojo/notes/ui/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dojo/notes/ui/forms.py b/dojo/notes/ui/forms.py new file mode 100644 index 00000000000..85e9423bb26 --- /dev/null +++ b/dojo/notes/ui/forms.py @@ -0,0 +1,39 @@ +from django import forms + +from dojo.notes.models import Notes +from dojo.utils import get_system_setting + + +class NoteForm(forms.ModelForm): + entry = forms.CharField(max_length=2400, widget=forms.Textarea(attrs={"rows": 4, "cols": 15}), + label="Notes:") + + class Meta: + model = Notes + fields = ["entry", "private"] + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + if disclaimer := get_system_setting("disclaimer_notes"): + self.disclaimer = disclaimer.strip() + + +class TypedNoteForm(NoteForm): + + def __init__(self, *args, **kwargs): + queryset = kwargs.pop("available_note_types") + super().__init__(*args, **kwargs) + self.fields["note_type"] = forms.ModelChoiceField(queryset=queryset, label="Note Type", required=True) + + class Meta: + model = Notes + fields = ["note_type", "entry", "private"] + + +class DeleteNoteForm(forms.ModelForm): + id = forms.IntegerField(required=True, + widget=forms.widgets.HiddenInput()) + + class Meta: + model = Notes + fields = ["id"] diff --git a/dojo/notes/urls.py b/dojo/notes/ui/urls.py similarity index 92% rename from dojo/notes/urls.py rename to dojo/notes/ui/urls.py index 00a9f17a83a..cf95618e5eb 100644 --- a/dojo/notes/urls.py +++ b/dojo/notes/ui/urls.py @@ -1,6 +1,6 @@ from django.urls import re_path -from . import views +from dojo.notes.ui import views urlpatterns = [ re_path(r"^notes/(?P\d+)/delete/(?P[\w-]+)/(?P\d+)$", views.delete_note, name="delete_note"), diff --git a/dojo/notes/views.py b/dojo/notes/ui/views.py similarity index 96% rename from dojo/notes/views.py rename to dojo/notes/ui/views.py index 66c4d0aecda..4b4f7c27457 100644 --- a/dojo/notes/views.py +++ b/dojo/notes/ui/views.py @@ -16,8 +16,10 @@ from dojo.finding.queries import get_authorized_findings # Local application/library imports -from dojo.forms import DeleteNoteForm, NoteForm, TypedNoteForm -from dojo.models import Engagement, Finding, Note_Type, NoteHistory, Notes, Test +from dojo.models import Engagement, Finding, Test +from dojo.note_type.models import Note_Type +from dojo.notes.models import NoteHistory, Notes +from dojo.notes.ui.forms import DeleteNoteForm, NoteForm, TypedNoteForm from dojo.test.queries import get_authorized_tests logger = logging.getLogger(__name__) diff --git a/dojo/reports/__init__.py b/dojo/reports/__init__.py index e69de29bb2d..54faba7100c 100644 --- a/dojo/reports/__init__.py +++ b/dojo/reports/__init__.py @@ -0,0 +1 @@ +import dojo.reports.admin # noqa: F401 diff --git a/dojo/reports/admin.py b/dojo/reports/admin.py new file mode 100644 index 00000000000..f0c7f236146 --- /dev/null +++ b/dojo/reports/admin.py @@ -0,0 +1,5 @@ +from django.contrib import admin + +from dojo.reports.models import Report_Type + +admin.site.register(Report_Type) diff --git a/dojo/reports/models.py b/dojo/reports/models.py new file mode 100644 index 00000000000..c1201126ff1 --- /dev/null +++ b/dojo/reports/models.py @@ -0,0 +1,5 @@ +from django.db import models + + +class Report_Type(models.Model): + name = models.CharField(max_length=255) diff --git a/dojo/reports/ui/__init__.py b/dojo/reports/ui/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dojo/reports/ui/forms.py b/dojo/reports/ui/forms.py new file mode 100644 index 00000000000..8c324781195 --- /dev/null +++ b/dojo/reports/ui/forms.py @@ -0,0 +1,28 @@ +from django import forms + +from dojo.utils import get_system_setting + + +class ReportOptionsForm(forms.Form): + yes_no = (("0", "No"), ("1", "Yes")) + include_finding_notes = forms.ChoiceField(choices=yes_no, label="Finding Notes") + include_finding_images = forms.ChoiceField(choices=yes_no, label="Finding Images") + include_executive_summary = forms.ChoiceField(choices=yes_no, label="Executive Summary") + include_table_of_contents = forms.ChoiceField(choices=yes_no, label="Table of Contents") + include_disclaimer = forms.ChoiceField(choices=yes_no, label="Disclaimer") + report_type = forms.ChoiceField(choices=(("HTML", "HTML"),)) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + if get_system_setting("disclaimer_reports_forced"): + self.fields["include_disclaimer"].disabled = True + self.fields["include_disclaimer"].initial = "1" # represents yes + self.fields["include_disclaimer"].help_text = "Administrator of the system enforced placement of disclaimer in all reports. You are not able exclude disclaimer from this report." + + +class CustomReportOptionsForm(forms.Form): + yes_no = (("0", "No"), ("1", "Yes")) + report_name = forms.CharField(required=False, max_length=100) + include_finding_notes = forms.ChoiceField(required=False, choices=yes_no) + include_finding_images = forms.ChoiceField(choices=yes_no, label="Finding Images") + report_type = forms.ChoiceField(choices=(("HTML", "HTML"),)) diff --git a/dojo/reports/urls.py b/dojo/reports/ui/urls.py similarity index 99% rename from dojo/reports/urls.py rename to dojo/reports/ui/urls.py index 19d4348478f..b7361b95b6f 100644 --- a/dojo/reports/urls.py +++ b/dojo/reports/ui/urls.py @@ -1,7 +1,7 @@ from django.conf import settings from django.urls import re_path -from dojo.reports import views +from dojo.reports.ui import views from dojo.utils import redirect_view # TODO: remove the else: branch once v3 migration is complete diff --git a/dojo/reports/views.py b/dojo/reports/ui/views.py similarity index 99% rename from dojo/reports/views.py rename to dojo/reports/ui/views.py index 7372e9a0287..3ac2692d940 100644 --- a/dojo/reports/views.py +++ b/dojo/reports/ui/views.py @@ -27,13 +27,13 @@ ReportFindingFilterWithoutObjectLookups, ) from dojo.finding.ui.views import BaseListFindings -from dojo.forms import ReportOptionsForm from dojo.labels import get_labels from dojo.location.models import Location from dojo.location.queries import get_authorized_locations from dojo.location.status import FindingLocationStatus from dojo.models import Dojo_User, Endpoint, Engagement, Finding, Product, Product_Type, Test from dojo.reports.queries import prefetch_related_endpoints_for_report, prefetch_related_findings_for_report +from dojo.reports.ui.forms import ReportOptionsForm from dojo.reports.widgets import ( CoverPage, CustomReportJsonForm, diff --git a/dojo/reports/widgets.py b/dojo/reports/widgets.py index 07377560539..9b5375c1574 100644 --- a/dojo/reports/widgets.py +++ b/dojo/reports/widgets.py @@ -17,12 +17,12 @@ ReportFindingFilter, ReportFindingFilterWithoutObjectLookups, ) -from dojo.forms import CustomReportOptionsForm from dojo.labels import get_labels from dojo.location.models import Location from dojo.location.status import FindingLocationStatus from dojo.models import Endpoint, Finding from dojo.reports.queries import prefetch_related_endpoints_for_report, prefetch_related_findings_for_report +from dojo.reports.ui.forms import CustomReportOptionsForm from dojo.url.filters import URLFilter from dojo.utils import get_page_items, get_system_setting, get_words_for_field diff --git a/dojo/risk_acceptance/__init__.py b/dojo/risk_acceptance/__init__.py index e69de29bb2d..a5cede2cf2e 100644 --- a/dojo/risk_acceptance/__init__.py +++ b/dojo/risk_acceptance/__init__.py @@ -0,0 +1 @@ +import dojo.risk_acceptance.admin # noqa: F401 diff --git a/dojo/risk_acceptance/admin.py b/dojo/risk_acceptance/admin.py new file mode 100644 index 00000000000..cec3ae2295f --- /dev/null +++ b/dojo/risk_acceptance/admin.py @@ -0,0 +1,5 @@ +from django.contrib import admin + +from dojo.risk_acceptance.models import Risk_Acceptance + +admin.site.register(Risk_Acceptance) diff --git a/dojo/risk_acceptance/api/__init__.py b/dojo/risk_acceptance/api/__init__.py new file mode 100644 index 00000000000..97fa34f5990 --- /dev/null +++ b/dojo/risk_acceptance/api/__init__.py @@ -0,0 +1,12 @@ +path = "risk_acceptance" # noqa: RUF067 + +# Backward-compat: the AcceptedRisks/AcceptedFindings mixins + AcceptedRiskSerializer +# were historically importable as `dojo.risk_acceptance.api.` (via the old api.py). +# finding/test/engagement api viewsets consume them as `ra_api.` — keep them resolvable. +from dojo.risk_acceptance.api.mixins import ( # noqa: E402, F401 -- backward compat + AcceptedFindingsMixin, + AcceptedRisk, + AcceptedRiskSerializer, + AcceptedRisksMixin, + _accept_risks, +) diff --git a/dojo/risk_acceptance/api/filters.py b/dojo/risk_acceptance/api/filters.py new file mode 100644 index 00000000000..2ed3f4d8c15 --- /dev/null +++ b/dojo/risk_acceptance/api/filters.py @@ -0,0 +1,37 @@ +from dojo.filters import DateRangeFilter, DojoFilter, OrderingFilter +from dojo.models import Risk_Acceptance + + +class ApiRiskAcceptanceFilter(DojoFilter): + created = DateRangeFilter() + updated = DateRangeFilter() + + o = OrderingFilter( + # tuple-mapping retains order + fields=( + ("name", "name"), + ("created", "created"), + ("updated", "updated"), + ), + ) + + class Meta: + model = Risk_Acceptance + fields = { + "name": ["exact", "icontains"], + "accepted_findings": ["exact"], + "recommendation": ["exact"], + "recommendation_details": ["exact", "icontains"], + "decision": ["exact"], + "decision_details": ["exact", "icontains"], + "accepted_by": ["exact", "icontains"], + "owner": ["exact"], + "expiration_date": ["exact", "gt", "lt", "gte", "lte"], + "expiration_date_warned": ["exact", "gt", "lt", "gte", "lte"], + "expiration_date_handled": ["exact", "gt", "lt", "gte", "lte"], + "reactivate_expired": ["exact"], + "restart_sla_expired": ["exact"], + "notes": ["exact"], + "created": ["exact", "gt", "lt", "gte", "lte"], + "updated": ["exact", "gt", "lt", "gte", "lte"], + } diff --git a/dojo/risk_acceptance/api.py b/dojo/risk_acceptance/api/mixins.py similarity index 98% rename from dojo/risk_acceptance/api.py rename to dojo/risk_acceptance/api/mixins.py index 78fa27062e9..cc245943d74 100644 --- a/dojo/risk_acceptance/api.py +++ b/dojo/risk_acceptance/api/mixins.py @@ -10,10 +10,10 @@ from rest_framework.permissions import IsAuthenticated from rest_framework.response import Response -from dojo.api_v2.serializers import RiskAcceptanceSerializer from dojo.authorization.api_permissions import UserHasRiskAcceptanceRelatedObjectPermission from dojo.engagement.queries import get_authorized_engagements from dojo.models import Engagement, Risk_Acceptance, User, Vulnerability_Id +from dojo.risk_acceptance.api.serializer import RiskAcceptanceSerializer AcceptedRisk = NamedTuple("AcceptedRisk", (("vulnerability_id", str), ("justification", str), ("accepted_by", str))) diff --git a/dojo/risk_acceptance/api/serializer.py b/dojo/risk_acceptance/api/serializer.py new file mode 100644 index 00000000000..39b4bdd879c --- /dev/null +++ b/dojo/risk_acceptance/api/serializer.py @@ -0,0 +1,137 @@ +from django.core.exceptions import PermissionDenied, ValidationError +from django.urls import reverse +from drf_spectacular.utils import extend_schema_field +from rest_framework import serializers + +import dojo.risk_acceptance.helper as ra_helper +from dojo.finding.queries import get_authorized_findings +from dojo.models import Engagement, Finding +from dojo.notes.api.serializer import NoteSerializer +from dojo.risk_acceptance.models import Risk_Acceptance + + +class RiskAcceptanceProofSerializer(serializers.ModelSerializer): + path = serializers.FileField(required=True) + + class Meta: + model = Risk_Acceptance + fields = ["path"] + + +class RiskAcceptanceToNotesSerializer(serializers.Serializer): + risk_acceptance_id = serializers.PrimaryKeyRelatedField( + queryset=Risk_Acceptance.objects.all(), many=False, allow_null=True, + ) + notes = NoteSerializer(many=True) + + +class RiskAcceptanceSerializer(serializers.ModelSerializer): + path = serializers.SerializerMethodField() + + def create(self, validated_data): + instance = super().create(validated_data) + user = getattr(self.context.get("request", None), "user", None) + ra_helper.add_findings_to_risk_acceptance(user, instance, instance.accepted_findings.all()) + + # Add risk acceptance to engagement + # This is fine as Pro has its own model + relationshop to track links with engagements. + if instance.accepted_findings.exists(): + engagement = instance.accepted_findings.first().test.engagement + engagement.risk_acceptance.add(instance) + + return instance + + def update(self, instance, validated_data): + # Determine findings to risk accept, and findings to unaccept risk + existing_findings = Finding.objects.filter(risk_acceptance=self.instance.id) + new_findings_ids = [x.id for x in validated_data.get("accepted_findings", [])] + new_findings = Finding.objects.filter(id__in=new_findings_ids) + findings_to_add = set(new_findings) - set(existing_findings) + findings_to_remove = set(existing_findings) - set(new_findings) + findings_to_add = Finding.objects.filter(id__in=[x.id for x in findings_to_add]) + findings_to_remove = Finding.objects.filter(id__in=[x.id for x in findings_to_remove]) + # Make the update in the database + instance = super().update(instance, validated_data) + user = getattr(self.context.get("request", None), "user", None) + # Add the new findings + ra_helper.add_findings_to_risk_acceptance(user, instance, findings_to_add) + # Remove the ones that were not present in the payload + for finding in findings_to_remove: + ra_helper.remove_finding_from_risk_acceptance(user, instance, finding) + + # Handle orphaned risk acceptances: link to engagement if it now has findings + # This is fine as Pro has its own model + relationshop to track links with engagements. + if instance.accepted_findings.exists() and not instance.engagement: + engagement = instance.accepted_findings.first().test.engagement + engagement.risk_acceptance.add(instance) + + return instance + + @extend_schema_field(serializers.CharField()) + def get_path(self, obj): + engagement = Engagement.objects.filter( + risk_acceptance__id__in=[obj.id], + ).first() + path = "No proof has been supplied" + if engagement and obj.filename() is not None: + path = reverse( + "download_risk_acceptance", args=(engagement.id, obj.id), + ) + request = self.context.get("request") + if request: + path = request.build_absolute_uri(path) + return path + + @extend_schema_field(serializers.IntegerField()) + def get_engagement(self, obj): + from dojo.engagement.api.serializer import ( # noqa: PLC0415 -- lazy import, avoids circular dependency + EngagementSerializer, + ) + engagement = Engagement.objects.filter( + risk_acceptance__id__in=[obj.id], + ).first() + return EngagementSerializer(read_only=True).to_representation( + engagement, + ) + + def validate(self, data): + def validate_findings_have_same_engagement(finding_objects: list[Finding]): + engagements = finding_objects.values_list("test__engagement__id", flat=True).distinct().count() + if engagements > 1: + msg = "You are not permitted to add findings from multiple engagements" + raise PermissionDenied(msg) + + findings = data.get("accepted_findings", []) + findings_ids = [x.id for x in findings] + finding_objects = Finding.objects.filter(id__in=findings_ids) + authed_findings = get_authorized_findings("edit").filter(id__in=findings_ids) + if len(findings) != len(authed_findings): + msg = "You are not permitted to add one or more selected findings to this risk acceptance" + raise PermissionDenied(msg) + if self.context["request"].method == "POST": + validate_findings_have_same_engagement(finding_objects) + + # Validate product allows full risk acceptance BEFORE creating instance + if finding_objects.exists(): + engagement = finding_objects.first().test.engagement + if not engagement.product.enable_full_risk_acceptance: + msg = "Full risk acceptance is not enabled for this product" + raise PermissionDenied(msg) + elif self.context["request"].method in {"PATCH", "PUT"}: + # Use the reverse relation instead of filtering + existing_findings = self.instance.accepted_findings.all() + existing_and_new_findings = existing_findings | finding_objects + validate_findings_have_same_engagement(existing_and_new_findings) + + # Explicit check to prevent engagement switching + risk_acceptance_engagement = self.instance.engagement + if risk_acceptance_engagement and finding_objects.exists(): + new_findings_engagement = finding_objects.first().test.engagement + if risk_acceptance_engagement.id != new_findings_engagement.id: + msg = f"Risk Acceptance belongs to engagement {risk_acceptance_engagement.id}. Cannot add findings from engagement {new_findings_engagement.id}" + raise ValidationError(msg) + return data + + class Meta: + model = Risk_Acceptance + fields = "__all__" diff --git a/dojo/risk_acceptance/api/urls.py b/dojo/risk_acceptance/api/urls.py new file mode 100644 index 00000000000..5b3387ce2ce --- /dev/null +++ b/dojo/risk_acceptance/api/urls.py @@ -0,0 +1,7 @@ +from dojo.risk_acceptance.api import path +from dojo.risk_acceptance.api.views import RiskAcceptanceViewSet + + +def add_risk_acceptance_urls(router): + router.register(path, RiskAcceptanceViewSet, basename="risk_acceptance") + return router diff --git a/dojo/risk_acceptance/api/views.py b/dojo/risk_acceptance/api/views.py new file mode 100644 index 00000000000..1b6a540cf0a --- /dev/null +++ b/dojo/risk_acceptance/api/views.py @@ -0,0 +1,148 @@ +import mimetypes +from pathlib import Path + +from django.conf import settings +from django.http import FileResponse +from django.urls import reverse +from django_filters.rest_framework import DjangoFilterBackend +from drf_spectacular.utils import extend_schema +from rest_framework import status +from rest_framework.decorators import action +from rest_framework.permissions import IsAuthenticated +from rest_framework.response import Response + +from dojo.api_v2 import serializers as api_v2_serializers +from dojo.api_v2.views import PrefetchDojoModelViewSet +from dojo.authorization import api_permissions as permissions +from dojo.models import NoteHistory, Notes, Risk_Acceptance +from dojo.risk_acceptance.api.filters import ApiRiskAcceptanceFilter +from dojo.risk_acceptance.api.serializer import ( + RiskAcceptanceProofSerializer, + RiskAcceptanceSerializer, + RiskAcceptanceToNotesSerializer, +) +from dojo.risk_acceptance.helper import remove_finding_from_risk_acceptance +from dojo.risk_acceptance.queries import get_authorized_risk_acceptances +from dojo.utils import process_tag_notifications + + +class RiskAcceptanceViewSet( + PrefetchDojoModelViewSet, +): + serializer_class = RiskAcceptanceSerializer + queryset = Risk_Acceptance.objects.none() + filter_backends = (DjangoFilterBackend,) + filterset_class = ApiRiskAcceptanceFilter + + permission_classes = ( + IsAuthenticated, + permissions.UserHasRiskAcceptancePermission, + ) + + def destroy(self, request, pk=None): + instance = self.get_object() + # Remove any findings on the risk acceptance + for finding in instance.accepted_findings.all(): + remove_finding_from_risk_acceptance(request.user, instance, finding) + # return the response of the object being deleted + return super().destroy(request, pk=pk) + + def get_queryset(self): + return ( + get_authorized_risk_acceptances("edit") + .prefetch_related( + "notes", "engagement_set", "owner", "accepted_findings", + ) + .distinct() + ) + + @extend_schema( + methods=["GET"], + responses={ + status.HTTP_200_OK: RiskAcceptanceToNotesSerializer, + }, + ) + @extend_schema( + methods=["POST"], + request=api_v2_serializers.AddNewNoteOptionSerializer, + responses={status.HTTP_201_CREATED: api_v2_serializers.NoteSerializer}, + ) + @action(detail=True, methods=["get", "post"], permission_classes=(IsAuthenticated, permissions.UserHasRiskAcceptanceRelatedObjectPermission)) + def notes(self, request, pk=None): + risk_acceptance = self.get_object() + if request.method == "POST": + new_note = api_v2_serializers.AddNewNoteOptionSerializer(data=request.data) + if new_note.is_valid(): + entry = new_note.validated_data["entry"] + private = new_note.validated_data.get("private", False) + note_type = new_note.validated_data.get("note_type", None) + else: + return Response(new_note.errors, status=status.HTTP_400_BAD_REQUEST) + + notes = risk_acceptance.notes.filter(note_type=note_type).first() + if notes and note_type and note_type.is_single: + return Response("Only one instance of this note_type allowed on a risk acceptance.", status=status.HTTP_400_BAD_REQUEST) + + author = request.user + note = Notes(entry=entry, author=author, private=private, note_type=note_type) + note.save() + history = NoteHistory.objects.create(data=note.entry, time=note.date, current_editor=note.author) + note.history.add(history) + risk_acceptance.notes.add(note) + engagement = risk_acceptance.engagement + if engagement: + process_tag_notifications( + request=request, + note=note, + parent_url=request.build_absolute_uri( + reverse("view_risk_acceptance", args=(engagement.id, risk_acceptance.id)), + ), + parent_title=f"Risk Acceptance: {risk_acceptance.name}", + ) + + serialized_note = api_v2_serializers.NoteSerializer( + {"author": author, "entry": entry, "private": private}, + ) + return Response(serialized_note.data, status=status.HTTP_201_CREATED) + + notes = risk_acceptance.notes.all() + serialized_notes = RiskAcceptanceToNotesSerializer( + {"risk_acceptance_id": risk_acceptance, "notes": notes}, + ) + return Response(serialized_notes.data, status=status.HTTP_200_OK) + + @extend_schema( + methods=["GET"], + responses={ + status.HTTP_200_OK: RiskAcceptanceProofSerializer, + }, + ) + @action(detail=True, methods=["get"], permission_classes=(IsAuthenticated, permissions.UserHasRiskAcceptanceRelatedObjectPermission)) + def download_proof(self, request, pk=None): + risk_acceptance = self.get_object() + # Get the file object + file_object = risk_acceptance.path + if file_object is None or risk_acceptance.filename() is None: + return Response( + {"error": "Proof has not provided to this risk acceptance..."}, + status=status.HTTP_404_NOT_FOUND, + ) + # Get the path of the file in media root + file_path = Path(settings.MEDIA_ROOT) / file_object.name + # NOTE: FileResponse takes ownership of closing the file handle when the response is closed. + # Explicitly register the closer to avoid potential resource leaks and satisfy static analyzers. + file_handle = file_path.open("rb") + # send file + response = FileResponse( + file_handle, + content_type=mimetypes.guess_type(str(file_path))[0] or "application/octet-stream", + status=status.HTTP_200_OK, + ) + if hasattr(response, "_resource_closers"): + response._resource_closers.append(file_handle.close) + response["Content-Length"] = file_object.size + response[ + "Content-Disposition" + ] = f'attachment; filename="{risk_acceptance.filename()}"' + + return response diff --git a/dojo/risk_acceptance/models.py b/dojo/risk_acceptance/models.py new file mode 100644 index 00000000000..54354e67df2 --- /dev/null +++ b/dojo/risk_acceptance/models.py @@ -0,0 +1,113 @@ +from pathlib import Path + +from django.db import models +from django.urls import reverse +from django.utils import timezone +from django.utils.translation import gettext as _ + +# copy_model_util is defined early in dojo.models, before the re-export that loads this +# module, so this resolves despite the partial circular load. +from dojo.models import copy_model_util + + +class Risk_Acceptance(models.Model): + TREATMENT_ACCEPT = "A" + TREATMENT_AVOID = "V" + TREATMENT_MITIGATE = "M" + TREATMENT_FIX = "F" + TREATMENT_TRANSFER = "T" + + TREATMENT_TRANSLATIONS = { + TREATMENT_ACCEPT: _("Accept (The risk is acknowledged, yet remains)"), + TREATMENT_AVOID: _("Avoid (Do not engage with whatever creates the risk)"), + TREATMENT_MITIGATE: _("Mitigate (The risk still exists, yet compensating controls make it less of a threat)"), + TREATMENT_FIX: _("Fix (The risk is eradicated)"), + TREATMENT_TRANSFER: _("Transfer (The risk is transferred to a 3rd party)"), + } + + TREATMENT_CHOICES = [ + (TREATMENT_ACCEPT, TREATMENT_TRANSLATIONS[TREATMENT_ACCEPT]), + (TREATMENT_AVOID, TREATMENT_TRANSLATIONS[TREATMENT_AVOID]), + (TREATMENT_MITIGATE, TREATMENT_TRANSLATIONS[TREATMENT_MITIGATE]), + (TREATMENT_FIX, TREATMENT_TRANSLATIONS[TREATMENT_FIX]), + (TREATMENT_TRANSFER, TREATMENT_TRANSLATIONS[TREATMENT_TRANSFER]), + ] + + name = models.CharField(max_length=300, null=False, blank=False, help_text=_("Descriptive name which in the future may also be used to group risk acceptances together across engagements and products")) + + accepted_findings = models.ManyToManyField("dojo.Finding") + + recommendation = models.CharField(choices=TREATMENT_CHOICES, max_length=2, null=False, default=TREATMENT_FIX, help_text=_("Recommendation from the security team."), verbose_name=_("Security Recommendation")) + + recommendation_details = models.TextField(null=True, + blank=True, + help_text=_("Explanation of security recommendation"), verbose_name=_("Security Recommendation Details")) + + decision = models.CharField(choices=TREATMENT_CHOICES, max_length=2, null=False, default=TREATMENT_ACCEPT, help_text=_("Risk treatment decision by risk owner")) + decision_details = models.TextField(default=None, blank=True, null=True, help_text=_("If a compensating control exists to mitigate the finding or reduce risk, then list the compensating control(s).")) + + accepted_by = models.CharField(max_length=200, default=None, null=True, blank=True, verbose_name=_("Accepted By"), help_text=_("The person that accepts the risk, can be outside of DefectDojo.")) + path = models.FileField(upload_to="risk/%Y/%m/%d", + editable=True, null=True, + blank=True, verbose_name=_("Proof")) + owner = models.ForeignKey("dojo.Dojo_User", editable=True, on_delete=models.RESTRICT, help_text=_("User in DefectDojo owning this acceptance. Only the owner and staff users can edit the risk acceptance.")) + + expiration_date = models.DateTimeField(default=None, null=True, blank=True, help_text=_("When the risk acceptance expires, the findings will be reactivated (unless disabled below).")) + expiration_date_warned = models.DateTimeField(default=None, null=True, blank=True, help_text=_("(readonly) Date at which notice about the risk acceptance expiration was sent.")) + expiration_date_handled = models.DateTimeField(default=None, null=True, blank=True, help_text=_("(readonly) When the risk acceptance expiration was handled (manually or by the daily job).")) + reactivate_expired = models.BooleanField(null=False, blank=False, default=True, verbose_name=_("Reactivate findings on expiration"), help_text=_("Reactivate findings when risk acceptance expires?")) + restart_sla_expired = models.BooleanField(default=False, null=False, verbose_name=_("Restart SLA on expiration"), help_text=_("When enabled, the SLA for findings is restarted when the risk acceptance expires.")) + + notes = models.ManyToManyField("dojo.Notes", editable=False) + created = models.DateTimeField(auto_now_add=True, null=False) + updated = models.DateTimeField(auto_now=True, editable=False) + + def __str__(self): + return str(self.name) + + def filename(self): + # logger.debug('path: "%s"', self.path) + if not self.path: + return None + return Path(self.path.name).name + + @property + def name_and_expiration_info(self): + return str(self.name) + (" (expired " if self.is_expired else " (expires ") + (timezone.localtime(self.expiration_date).strftime("%b %d, %Y") if self.expiration_date else "Never") + ")" + + def get_breadcrumbs(self): + bc = self.engagement_set.first().get_breadcrumbs() + bc += [{"title": str(self), + "url": reverse("view_risk_acceptance", args=( + self.engagement_set.first().product.id, self.id))}] + return bc + + @property + def is_expired(self): + return self.expiration_date_handled is not None + + # relationship is many to many, but we use it as one-to-many + @property + def engagement(self): + engs = self.engagement_set.all() + if engs: + return engs[0] + + return None + + def copy(self, engagement=None): + from dojo.models import Finding # noqa: PLC0415 -- lazy import, avoids circular dependency + copy = copy_model_util(self) + # Save the necessary ManyToMany relationships + old_notes = list(self.notes.all()) + old_accepted_findings_hash_codes = [finding.hash_code for finding in self.accepted_findings.all()] + # Save the object before setting any ManyToMany relationships + copy.save() + # Copy the notes + for notes in old_notes: + copy.notes.add(notes.copy()) + # Assign any accepted findings + if engagement: + new_accepted_findings = Finding.objects.filter(test__engagement=engagement, hash_code__in=old_accepted_findings_hash_codes, risk_accepted=True).distinct() + copy.accepted_findings.set(new_accepted_findings) + return copy diff --git a/dojo/risk_acceptance/ui/__init__.py b/dojo/risk_acceptance/ui/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dojo/risk_acceptance/ui/forms.py b/dojo/risk_acceptance/ui/forms.py new file mode 100644 index 00000000000..8344bc0e769 --- /dev/null +++ b/dojo/risk_acceptance/ui/forms.py @@ -0,0 +1,80 @@ +import logging +from pathlib import Path + +from dateutil.relativedelta import relativedelta +from django import forms +from django.conf import settings +from django.core.exceptions import ValidationError +from django.utils import timezone + +from dojo.finding.queries import get_authorized_findings +from dojo.models import Finding, Risk_Acceptance +from dojo.utils import get_system_setting + +logger = logging.getLogger(__name__) + + +class EditRiskAcceptanceForm(forms.ModelForm): + # unfortunately django forces us to repeat many things here. choices, default, required etc. + recommendation = forms.ChoiceField(choices=Risk_Acceptance.TREATMENT_CHOICES, initial=Risk_Acceptance.TREATMENT_ACCEPT, widget=forms.RadioSelect, label="Security Recommendation") + decision = forms.ChoiceField(choices=Risk_Acceptance.TREATMENT_CHOICES, initial=Risk_Acceptance.TREATMENT_ACCEPT, widget=forms.RadioSelect) + + path = forms.FileField(label="Proof", required=False, widget=forms.widgets.FileInput(attrs={"accept": ", ".join(settings.FILE_IMPORT_TYPES)})) + expiration_date = forms.DateTimeField(required=False, widget=forms.TextInput(attrs={"class": "datepicker"})) + + class Meta: + model = Risk_Acceptance + exclude = ["accepted_findings", "notes"] + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.fields["path"].help_text = f"Existing proof uploaded: {self.instance.filename()}" if self.instance.filename() else "None" + self.fields["expiration_date_warned"].disabled = True + self.fields["expiration_date_handled"].disabled = True + + def clean_path(self): + if (data := self.cleaned_data.get("path")) is not None: + ext = Path(data.name).suffix # [0] returns path+filename + valid_extensions = settings.FILE_UPLOAD_TYPES + if ext.lower() not in valid_extensions: + if accepted_extensions := f"{', '.join(valid_extensions)}": + msg = f"Unsupported extension. Supported extensions are as follows: {accepted_extensions}" + else: + msg = "File uploads are prohibited due to the list of acceptable file extensions being empty" + raise ValidationError(msg) + return data + + +class RiskAcceptanceForm(EditRiskAcceptanceForm): + accepted_findings = forms.ModelMultipleChoiceField( + queryset=Finding.objects.none(), required=True, + widget=forms.widgets.SelectMultiple(attrs={"size": 10}), + help_text=("Active, verified findings listed, please select to add findings.")) + notes = forms.CharField(required=False, max_length=2400, + widget=forms.Textarea, + label="Notes") + + class Meta: + model = Risk_Acceptance + fields = "__all__" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + expiration_delta_days = get_system_setting("risk_acceptance_form_default_days") + logger.debug("expiration_delta_days: %i", expiration_delta_days) + if expiration_delta_days > 0: + expiration_date = timezone.now().date() + relativedelta(days=expiration_delta_days) + # logger.debug('setting default expiration_date: %s', expiration_date) + self.fields["expiration_date"].initial = expiration_date + # self.fields['path'].help_text = 'Existing proof uploaded: %s' % self.instance.filename() if self.instance.filename() else 'None' + self.fields["accepted_findings"].queryset = get_authorized_findings("edit") + if disclaimer := get_system_setting("disclaimer_notes"): + self.disclaimer = disclaimer.strip() + + +class ReplaceRiskAcceptanceProofForm(forms.ModelForm): + path = forms.FileField(label="Proof", required=True, widget=forms.widgets.FileInput(attrs={"accept": ".jpg,.png,.pdf"})) + + class Meta: + model = Risk_Acceptance + fields = ["path"] diff --git a/dojo/url/ui/views.py b/dojo/url/ui/views.py index 13eb6521286..3d9845fb561 100644 --- a/dojo/url/ui/views.py +++ b/dojo/url/ui/views.py @@ -24,7 +24,7 @@ from dojo.location.queries import annotate_location_counts_and_status, get_authorized_locations from dojo.location.status import FindingLocationStatus, ProductLocationStatus from dojo.models import DojoMeta, Finding, Product -from dojo.reports.views import generate_report +from dojo.reports.ui.views import generate_report from dojo.url.filters import URLFilter from dojo.url.models import URL from dojo.url.queries import annotate_host_contents diff --git a/dojo/urls.py b/dojo/urls.py index 28d2427cebd..be79a7f8bd0 100644 --- a/dojo/urls.py +++ b/dojo/urls.py @@ -26,11 +26,8 @@ LanguageTypeViewSet, LanguageViewSet, NetworkLocationsViewset, - NotesViewSet, - NoteTypeViewSet, RegulationsViewSet, ReImportScanView, - RiskAcceptanceViewSet, SLAConfigurationViewset, SonarqubeIssueTransitionViewSet, SonarqubeIssueViewSet, @@ -55,8 +52,10 @@ from dojo.location.api.endpoint_compat import V3EndpointCompatibleViewSet, V3EndpointStatusCompatibleViewSet from dojo.location.api.urls import add_locations_urls from dojo.metrics.urls import urlpatterns as metrics_urls -from dojo.note_type.urls import urlpatterns as note_type_urls -from dojo.notes.urls import urlpatterns as notes_urls +from dojo.note_type.api.urls import add_note_type_urls +from dojo.note_type.ui.urls import urlpatterns as note_type_urls +from dojo.notes.api.urls import add_notes_urls +from dojo.notes.ui.urls import urlpatterns as notes_urls from dojo.notifications.api.urls import add_notifications_urls from dojo.notifications.ui.urls import urlpatterns as notifications_urls from dojo.object.urls import urlpatterns as object_urls @@ -65,7 +64,8 @@ from dojo.product.api.urls import add_product_urls from dojo.product_type.api.urls import add_product_type_urls from dojo.regulations.urls import urlpatterns as regulations -from dojo.reports.urls import urlpatterns as reports_urls +from dojo.reports.ui.urls import urlpatterns as reports_urls +from dojo.risk_acceptance.api.urls import add_risk_acceptance_urls from dojo.search.urls import urlpatterns as search_urls from dojo.sla_config.urls import urlpatterns as sla_urls from dojo.survey.ui.urls import urlpatterns as survey_urls @@ -116,8 +116,8 @@ v2_api.register(r"language_types", LanguageTypeViewSet, basename="language_type") v2_api.register(r"metadata", DojoMetaViewSet, basename="metadata") v2_api.register(r"network_locations", NetworkLocationsViewset, basename="network_locations") -v2_api.register(r"notes", NotesViewSet, basename="notes") -v2_api.register(r"note_type", NoteTypeViewSet, basename="note_type") +v2_api = add_notes_urls(v2_api) +v2_api = add_note_type_urls(v2_api) add_notifications_urls(v2_api) v2_api = add_product_urls(v2_api) # RBAC endpoints moved to Pro under legacy authorization: @@ -129,7 +129,7 @@ # product_type_members, product_type_groups → pro/product_type_members, pro/product_type_groups v2_api.register(r"regulations", RegulationsViewSet, basename="regulations") v2_api.register(r"reimport-scan", ReImportScanView, basename="reimportscan") -v2_api.register(r"risk_acceptance", RiskAcceptanceViewSet, basename="risk_acceptance") +v2_api = add_risk_acceptance_urls(v2_api) # RBAC endpoint moved to Pro under legacy authorization: roles → pro/roles v2_api.register(r"sla_configurations", SLAConfigurationViewset, basename="sla_configurations") v2_api.register(r"sonarqube_issues", SonarqubeIssueViewSet, basename="sonarqube_issue") diff --git a/unittests/test_rest_framework.py b/unittests/test_rest_framework.py index 4e55f580cc5..c661d4678b3 100644 --- a/unittests/test_rest_framework.py +++ b/unittests/test_rest_framework.py @@ -50,7 +50,6 @@ LanguageViewSet, NotesViewSet, NoteTypeViewSet, - RiskAcceptanceViewSet, SonarqubeIssueViewSet, ) from dojo.asset.api.views import ( @@ -109,6 +108,7 @@ ) from dojo.product.api.views import ProductAPIScanConfigurationViewSet, ProductViewSet from dojo.product_type.api.views import ProductTypeViewSet +from dojo.risk_acceptance.api.views import RiskAcceptanceViewSet from dojo.test.api.views import TestsViewSet, TestTypesViewSet from dojo.tool_config.api.views import ToolConfigurationsViewSet from dojo.tool_product.api.views import ToolProductSettingsViewSet