Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
229 changes: 25 additions & 204 deletions dojo/api_v2/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,16 @@
import tagulous
from django.conf import settings
from django.contrib.auth.models import Permission
from django.core.exceptions import PermissionDenied, ValidationError
from django.core.exceptions import ValidationError
from django.db import transaction
from django.db.utils import IntegrityError
from django.urls import reverse
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from drf_spectacular.utils import extend_schema_field
from rest_framework import serializers
from rest_framework.exceptions import NotFound
from rest_framework.exceptions import ValidationError as RestFrameworkValidationError

import dojo.risk_acceptance.helper as ra_helper
from dojo.finding.queries import get_authorized_findings
from dojo.importers.auto_create_context import AutoCreateContextManager
from dojo.importers.base_importer import BaseImporter
from dojo.importers.default_importer import DefaultImporter
Expand All @@ -43,13 +40,10 @@
Language_Type,
Languages,
Network_Locations,
Note_Type,
NoteHistory,
Notes,
Product,
Product_API_Scan_Configuration,
Regulation,
Risk_Acceptance,
SLA_Configuration,
Sonarqube_Issue,
Sonarqube_Issue_Transition,
Expand Down Expand Up @@ -291,92 +285,20 @@ def validate(self, data):
return data


from dojo.user.api.serializer import ( # noqa: E402, F401 -- backward compat + prefetcher discovery
AddUserSerializer,
UserContactInfoSerializer,
UserProfileSerializer,
UserSerializer,
UserStubSerializer,
)


class NoteTypeSerializer(serializers.ModelSerializer):
class Meta:
model = Note_Type
fields = "__all__"


class NoteHistorySerializer(serializers.ModelSerializer):
current_editor = UserStubSerializer(read_only=True)
note_type = NoteTypeSerializer(read_only=True, many=False)

class Meta:
model = NoteHistory
fields = "__all__"


class NoteSerializer(serializers.ModelSerializer):
author = UserStubSerializer(many=False, read_only=True)
editor = UserStubSerializer(read_only=True, many=False, allow_null=True)
history = NoteHistorySerializer(read_only=True, many=True)
note_type = NoteTypeSerializer(read_only=True, many=False)

def update(self, instance, validated_data):
instance.entry = validated_data.get("entry")
instance.edited = True
instance.editor = self.context["request"].user
instance.edit_time = timezone.now()
history = NoteHistory(
data=instance.entry,
time=instance.edit_time,
current_editor=instance.editor,
)
history.save()
instance.history.add(history)
instance.save()
return instance

class Meta:
model = Notes
fields = "__all__"


class FileSerializer(serializers.ModelSerializer):
file = serializers.FileField(required=True)

class Meta:
model = FileUpload
fields = "__all__"

def validate(self, data):
if file := data.get("file"):
# the clean will validate the file extensions and raise a Validation error if the extensions are not accepted
FileUpload(title=file.name, file=file).clean()
return data
return None


class RawFileSerializer(serializers.ModelSerializer):
file = serializers.FileField(required=True)

class Meta:
model = FileUpload
fields = ["file"]


class RiskAcceptanceProofSerializer(serializers.ModelSerializer):
path = serializers.FileField(required=True)

class Meta:
model = Risk_Acceptance
fields = ["path"]


# Engagement serializers live in dojo/engagement/api/serializer.py.
# EngagementSerializer is re-exported here because ReportGenerateSerializer and
# RiskAcceptanceSerializer (below) still reference it. The other engagement
# serializers are imported directly from dojo.engagement.api by their consumers.
from dojo.engagement.api.serializer import EngagementSerializer # noqa: E402 -- backward compat
from dojo.file_uploads.api.serializer import ( # noqa: E402, F401 -- re-export; prefetcher + lazy consumers in finding/test/engagement
FileSerializer,
RawFileSerializer,
)
from dojo.note_type.api.serializer import NoteTypeSerializer # noqa: E402, F401 -- re-export for prefetcher discovery
from dojo.notes.api.serializer import ( # noqa: E402, F401 -- re-export; prefetcher + RiskAcceptanceToNotesSerializer + lazy consumers
NoteHistorySerializer,
NoteSerializer,
)

# Product serializers live in dojo/product/api/serializer.py. ProductSerializer is
# re-exported because ReportGenerateSerializer (below) still references it;
Expand All @@ -388,13 +310,13 @@ class Meta:
ProductSerializer,
)
from dojo.product_type.api.serializer import ProductTypeSerializer # noqa: E402


class RiskAcceptanceToNotesSerializer(serializers.Serializer):
risk_acceptance_id = serializers.PrimaryKeyRelatedField(
queryset=Risk_Acceptance.objects.all(), many=False, allow_null=True,
)
notes = NoteSerializer(many=True)
from dojo.user.api.serializer import ( # noqa: E402, F401 -- backward compat + prefetcher discovery
AddUserSerializer,
UserContactInfoSerializer,
UserProfileSerializer,
UserSerializer,
UserStubSerializer,
)


class AppAnalysisSerializer(serializers.ModelSerializer):
Expand Down Expand Up @@ -446,118 +368,17 @@ class Meta:
fields = "__all__"


# Risk acceptance serializers live in dojo/risk_acceptance/api/serializer.py. Re-exported here
# for backward compat: RiskAcceptanceSerializer is lazy-imported by dojo/finding/api/serializer.py
# (schema overrides); the ModelSerializers must also stay discoverable by the prefetcher.
from dojo.risk_acceptance.api.serializer import ( # noqa: E402 -- backward compat / prefetcher discovery
RiskAcceptanceProofSerializer, # noqa: F401
RiskAcceptanceSerializer, # noqa: F401 -- lazy-imported by finding schema overrides + prefetcher
RiskAcceptanceToNotesSerializer, # noqa: F401
)
from dojo.test.api.serializer import TestSerializer # noqa: E402 -- backward compat re-export


class RiskAcceptanceSerializer(serializers.ModelSerializer):
path = serializers.SerializerMethodField()

def create(self, validated_data):
instance = super().create(validated_data)
user = getattr(self.context.get("request", None), "user", None)
ra_helper.add_findings_to_risk_acceptance(user, instance, instance.accepted_findings.all())

# Add risk acceptance to engagement
# This is fine as Pro has its own model + relationshop to track links with engagements.
if instance.accepted_findings.exists():
engagement = instance.accepted_findings.first().test.engagement
engagement.risk_acceptance.add(instance)

return instance

def update(self, instance, validated_data):
# Determine findings to risk accept, and findings to unaccept risk
existing_findings = Finding.objects.filter(risk_acceptance=self.instance.id)
new_findings_ids = [x.id for x in validated_data.get("accepted_findings", [])]
new_findings = Finding.objects.filter(id__in=new_findings_ids)
findings_to_add = set(new_findings) - set(existing_findings)
findings_to_remove = set(existing_findings) - set(new_findings)
findings_to_add = Finding.objects.filter(id__in=[x.id for x in findings_to_add])
findings_to_remove = Finding.objects.filter(id__in=[x.id for x in findings_to_remove])
# Make the update in the database
instance = super().update(instance, validated_data)
user = getattr(self.context.get("request", None), "user", None)
# Add the new findings
ra_helper.add_findings_to_risk_acceptance(user, instance, findings_to_add)
# Remove the ones that were not present in the payload
for finding in findings_to_remove:
ra_helper.remove_finding_from_risk_acceptance(user, instance, finding)

# Handle orphaned risk acceptances: link to engagement if it now has findings
# This is fine as Pro has its own model + relationshop to track links with engagements.
if instance.accepted_findings.exists() and not instance.engagement:
engagement = instance.accepted_findings.first().test.engagement
engagement.risk_acceptance.add(instance)

return instance

@extend_schema_field(serializers.CharField())
def get_path(self, obj):
engagement = Engagement.objects.filter(
risk_acceptance__id__in=[obj.id],
).first()
path = "No proof has been supplied"
if engagement and obj.filename() is not None:
path = reverse(
"download_risk_acceptance", args=(engagement.id, obj.id),
)
request = self.context.get("request")
if request:
path = request.build_absolute_uri(path)
return path

@extend_schema_field(serializers.IntegerField())
def get_engagement(self, obj):
engagement = Engagement.objects.filter(
risk_acceptance__id__in=[obj.id],
).first()
return EngagementSerializer(read_only=True).to_representation(
engagement,
)

def validate(self, data):
def validate_findings_have_same_engagement(finding_objects: list[Finding]):
engagements = finding_objects.values_list("test__engagement__id", flat=True).distinct().count()
if engagements > 1:
msg = "You are not permitted to add findings from multiple engagements"
raise PermissionDenied(msg)

findings = data.get("accepted_findings", [])
findings_ids = [x.id for x in findings]
finding_objects = Finding.objects.filter(id__in=findings_ids)
authed_findings = get_authorized_findings("edit").filter(id__in=findings_ids)
if len(findings) != len(authed_findings):
msg = "You are not permitted to add one or more selected findings to this risk acceptance"
raise PermissionDenied(msg)
if self.context["request"].method == "POST":
validate_findings_have_same_engagement(finding_objects)

# Validate product allows full risk acceptance BEFORE creating instance
if finding_objects.exists():
engagement = finding_objects.first().test.engagement
if not engagement.product.enable_full_risk_acceptance:
msg = "Full risk acceptance is not enabled for this product"
raise PermissionDenied(msg)
elif self.context["request"].method in {"PATCH", "PUT"}:
# Use the reverse relation instead of filtering
existing_findings = self.instance.accepted_findings.all()
existing_and_new_findings = existing_findings | finding_objects
validate_findings_have_same_engagement(existing_and_new_findings)

# Explicit check to prevent engagement switching
risk_acceptance_engagement = self.instance.engagement
if risk_acceptance_engagement and finding_objects.exists():
new_findings_engagement = finding_objects.first().test.engagement
if risk_acceptance_engagement.id != new_findings_engagement.id:
msg = f"Risk Acceptance belongs to engagement {risk_acceptance_engagement.id}. Cannot add findings from engagement {new_findings_engagement.id}"
raise ValidationError(msg)
return data

class Meta:
model = Risk_Acceptance
fields = "__all__"


class CommonImportScanSerializer(serializers.Serializer):
scan_date = serializers.DateField(
required=False,
Expand Down
Loading