Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
256 changes: 10 additions & 246 deletions dojo/api_v2/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from rest_framework.exceptions import ValidationError as RestFrameworkValidationError

import dojo.risk_acceptance.helper as ra_helper
from dojo.endpoint.utils import endpoint_filter, endpoint_meta_import
from dojo.finding.queries import get_authorized_findings
from dojo.importers.auto_create_context import AutoCreateContextManager
from dojo.importers.base_importer import BaseImporter
Expand All @@ -37,8 +36,6 @@
Development_Environment,
DojoMeta,
Endpoint,
Endpoint_Params,
Endpoint_Status,
Engagement,
FileUpload,
Finding,
Expand All @@ -57,11 +54,7 @@
Sonarqube_Issue,
Sonarqube_Issue_Transition,
Test,
Tool_Configuration,
Tool_Product_Settings,
Tool_Type,
User,
get_current_date,
)
from dojo.product_announcements import (
LargeScanSizeProductAnnouncement,
Expand Down Expand Up @@ -412,19 +405,7 @@ class Meta:
fields = "__all__"


class ToolTypeSerializer(serializers.ModelSerializer):
class Meta:
model = Tool_Type
fields = "__all__"

def validate(self, data):
if self.context["request"].method == "POST":
name = data.get("name")
# Make sure this will not create a duplicate test type
if Tool_Type.objects.filter(name=name).count() > 0:
msg = "A Tool Type with the name already exists"
raise serializers.ValidationError(msg)
return data
from dojo.tool_type.api.serializer import ToolTypeSerializer # noqa: E402, F401 -- re-export


class RegulationSerializer(serializers.ModelSerializer):
Expand All @@ -433,171 +414,18 @@ class Meta:
fields = "__all__"


class ToolConfigurationSerializer(serializers.ModelSerializer):
class Meta:
model = Tool_Configuration
fields = "__all__"
extra_kwargs = {
"password": {"write_only": True},
"ssh": {"write_only": True},
"api_key": {"write_only": True},
}


class ToolProductSettingsSerializer(serializers.ModelSerializer):
setting_url = serializers.CharField(source="url")
product = serializers.PrimaryKeyRelatedField(
queryset=Product.objects.all(), required=True,
)

class Meta:
model = Tool_Product_Settings
fields = "__all__"


class EndpointStatusSerializer(serializers.ModelSerializer):
class Meta:
model = Endpoint_Status
fields = "__all__"

def run_validators(self, initial_data):
try:
return super().run_validators(initial_data)
except RestFrameworkValidationError as exc:
if "finding, endpoint must make a unique set" in str(exc):
msg = "This endpoint-finding relation already exists"
raise serializers.ValidationError(msg) from exc
raise

def create(self, validated_data):
endpoint = validated_data.get("endpoint")
finding = validated_data.get("finding")
try:
status = Endpoint_Status.objects.create(
finding=finding, endpoint=endpoint,
)
except IntegrityError as ie:
if "finding, endpoint must make a unique set" in str(ie):
msg = "This endpoint-finding relation already exists"
raise serializers.ValidationError(msg)
raise
status.mitigated = validated_data.get("mitigated", False)
status.false_positive = validated_data.get("false_positive", False)
status.out_of_scope = validated_data.get("out_of_scope", False)
status.risk_accepted = validated_data.get("risk_accepted", False)
status.date = validated_data.get("date", get_current_date())
status.save()
return status

def update(self, instance, validated_data):
try:
return super().update(instance, validated_data)
except IntegrityError as ie:
if "finding, endpoint must make a unique set" in str(ie):
msg = "This endpoint-finding relation already exists"
raise serializers.ValidationError(msg)
raise


class EndpointSerializer(serializers.ModelSerializer):
tags = TagListSerializerField(required=False)
active_finding_count = serializers.IntegerField(read_only=True)

class Meta:
model = Endpoint
exclude = ("inherited_tags",)

def validate(self, data):

if self.context["request"].method != "PATCH":
if "product" not in data:
msg = "Product is required"
raise serializers.ValidationError(msg)
protocol = data.get("protocol")
userinfo = data.get("userinfo")
host = data.get("host")
port = data.get("port")
path = data.get("path")
query = data.get("query")
fragment = data.get("fragment")
product = data.get("product")
else:
protocol = data.get("protocol", self.instance.protocol)
userinfo = data.get("userinfo", self.instance.userinfo)
host = data.get("host", self.instance.host)
port = data.get("port", self.instance.port)
path = data.get("path", self.instance.path)
query = data.get("query", self.instance.query)
fragment = data.get("fragment", self.instance.fragment)
if "product" in data and data["product"] != self.instance.product:
msg = "Change of product is not possible"
raise serializers.ValidationError(msg)
product = self.instance.product

endpoint_ins = Endpoint(
protocol=protocol,
userinfo=userinfo,
host=host,
port=port,
path=path,
query=query,
fragment=fragment,
product=product,
)
endpoint_ins.clean() # Run standard validation and clean process; can raise errors

endpoint = endpoint_filter(
protocol=endpoint_ins.protocol,
userinfo=endpoint_ins.userinfo,
host=endpoint_ins.host,
port=endpoint_ins.port,
path=endpoint_ins.path,
query=endpoint_ins.query,
fragment=endpoint_ins.fragment,
product=endpoint_ins.product,
)
if (
self.context["request"].method in {"PUT", "PATCH"}
and (
(endpoint.count() > 1)
or (
endpoint.count() == 1
and endpoint.first().pk != self.instance.pk
)
)
) or (
self.context["request"].method == "POST" and endpoint.count() > 0
):
msg = (
"It appears as though an endpoint with this data already "
"exists for this product."
)
raise serializers.ValidationError(msg, code="invalid")

# use clean data
data["protocol"] = endpoint_ins.protocol
data["userinfo"] = endpoint_ins.userinfo
data["host"] = endpoint_ins.host
data["port"] = endpoint_ins.port
data["path"] = endpoint_ins.path
data["query"] = endpoint_ins.query
data["fragment"] = endpoint_ins.fragment
data["product"] = endpoint_ins.product

return data


class EndpointParamsSerializer(serializers.ModelSerializer):
class Meta:
model = Endpoint_Params
fields = "__all__"


from dojo.jira.api.serializers import ( # noqa: E402, F401 backward compat
from dojo.endpoint.api.serializer import ( # noqa: E402, F401 -- re-export; prefetcher discovery requires all moved ModelSerializers here
EndpointParamsSerializer,
EndpointSerializer,
EndpointStatusSerializer,
)
from dojo.jira.api.serializers import ( # noqa: E402, F401 -- backward compat re-export
JIRAInstanceSerializer,
JIRAIssueSerializer,
JIRAProjectSerializer,
)
from dojo.tool_config.api.serializer import ToolConfigurationSerializer # noqa: E402, F401 -- re-export
from dojo.tool_product.api.serializer import ToolProductSettingsSerializer # noqa: E402, F401 -- re-export


class SonarqubeIssueSerializer(serializers.ModelSerializer):
Expand Down Expand Up @@ -1219,71 +1047,7 @@ def save(self, *, push_to_jira=False):
self.process_scan(auto_create_manager, data, context)


class EndpointMetaImporterSerializer(serializers.Serializer):
file = serializers.FileField(required=True)
create_endpoints = serializers.BooleanField(default=True, required=False)
create_tags = serializers.BooleanField(default=True, required=False)
create_dojo_meta = serializers.BooleanField(default=False, required=False)
product_name = serializers.CharField(required=False)
product = serializers.PrimaryKeyRelatedField(
queryset=Product.objects.all(), required=False,
)
# extra fields populated in response
# need to use the _id suffix as without the serializer framework gets
# confused
product_id = serializers.IntegerField(read_only=True)

def validate(self, data):
file = data.get("file")
if file and is_scan_file_too_large(file):
msg = f"Report file is too large. Maximum supported size is {settings.SCAN_FILE_MAX_SIZE} MB"
raise serializers.ValidationError(msg)

return data

def save(self):
data = self.validated_data
file = data.get("file")
create_endpoints = data.get("create_endpoints", True)
create_tags = data.get("create_tags", True)
create_dojo_meta = data.get("create_dojo_meta", False)
auto_create = AutoCreateContextManager()
# Process the context to make an conversions needed. Catch any exceptions
# in this case and wrap them in a DRF exception
try:
auto_create.process_import_meta_data_from_dict(data)
# Get an existing product
product = auto_create.get_target_product_if_exists(**data)
if not product:
product = auto_create.get_target_product_by_id_if_exists(**data)
except (ValueError, TypeError) as e:
# Raise an explicit drf exception here
raise ValidationError(str(e))
try:
if settings.V3_FEATURE_LOCATIONS:
endpoint_meta_import(
file,
product,
create_endpoints,
create_tags,
create_dojo_meta,
origin="API",
object_class=Location,
)
else:
# TODO: Delete this after the move to Locations
endpoint_meta_import(
file,
product,
create_endpoints,
create_tags,
create_dojo_meta,
origin="API",
)
except SyntaxError as se:
raise Exception(se)
except ValueError as ve:
raise Exception(ve)
from dojo.endpoint.api.serializer import EndpointMetaImporterSerializer # noqa: E402, F401 -- re-export


class LanguageTypeSerializer(serializers.ModelSerializer):
Expand Down
Loading