diff --git a/dojo/api_v2/serializers.py b/dojo/api_v2/serializers.py index be48838afe0..2880f738fb1 100644 --- a/dojo/api_v2/serializers.py +++ b/dojo/api_v2/serializers.py @@ -20,7 +20,6 @@ from rest_framework.exceptions import ValidationError as RestFrameworkValidationError import dojo.risk_acceptance.helper as ra_helper -from dojo.endpoint.utils import endpoint_filter, endpoint_meta_import from dojo.finding.queries import get_authorized_findings from dojo.importers.auto_create_context import AutoCreateContextManager from dojo.importers.base_importer import BaseImporter @@ -37,8 +36,6 @@ Development_Environment, DojoMeta, Endpoint, - Endpoint_Params, - Endpoint_Status, Engagement, FileUpload, Finding, @@ -57,11 +54,7 @@ Sonarqube_Issue, Sonarqube_Issue_Transition, Test, - Tool_Configuration, - Tool_Product_Settings, - Tool_Type, User, - get_current_date, ) from dojo.product_announcements import ( LargeScanSizeProductAnnouncement, @@ -412,19 +405,7 @@ class Meta: fields = "__all__" -class ToolTypeSerializer(serializers.ModelSerializer): - class Meta: - model = Tool_Type - fields = "__all__" - - def validate(self, data): - if self.context["request"].method == "POST": - name = data.get("name") - # Make sure this will not create a duplicate test type - if Tool_Type.objects.filter(name=name).count() > 0: - msg = "A Tool Type with the name already exists" - raise serializers.ValidationError(msg) - return data +from dojo.tool_type.api.serializer import ToolTypeSerializer # noqa: E402, F401 -- re-export class RegulationSerializer(serializers.ModelSerializer): @@ -433,171 +414,18 @@ class Meta: fields = "__all__" -class ToolConfigurationSerializer(serializers.ModelSerializer): - class Meta: - model = Tool_Configuration - fields = "__all__" - extra_kwargs = { - "password": {"write_only": True}, - "ssh": {"write_only": True}, - "api_key": {"write_only": True}, - } - - -class ToolProductSettingsSerializer(serializers.ModelSerializer): - setting_url = serializers.CharField(source="url") - product = serializers.PrimaryKeyRelatedField( - queryset=Product.objects.all(), required=True, - ) - - class Meta: - model = Tool_Product_Settings - fields = "__all__" - - -class EndpointStatusSerializer(serializers.ModelSerializer): - class Meta: - model = Endpoint_Status - fields = "__all__" - - def run_validators(self, initial_data): - try: - return super().run_validators(initial_data) - except RestFrameworkValidationError as exc: - if "finding, endpoint must make a unique set" in str(exc): - msg = "This endpoint-finding relation already exists" - raise serializers.ValidationError(msg) from exc - raise - - def create(self, validated_data): - endpoint = validated_data.get("endpoint") - finding = validated_data.get("finding") - try: - status = Endpoint_Status.objects.create( - finding=finding, endpoint=endpoint, - ) - except IntegrityError as ie: - if "finding, endpoint must make a unique set" in str(ie): - msg = "This endpoint-finding relation already exists" - raise serializers.ValidationError(msg) - raise - status.mitigated = validated_data.get("mitigated", False) - status.false_positive = validated_data.get("false_positive", False) - status.out_of_scope = validated_data.get("out_of_scope", False) - status.risk_accepted = validated_data.get("risk_accepted", False) - status.date = validated_data.get("date", get_current_date()) - status.save() - return status - - def update(self, instance, validated_data): - try: - return super().update(instance, validated_data) - except IntegrityError as ie: - if "finding, endpoint must make a unique set" in str(ie): - msg = "This endpoint-finding relation already exists" - raise serializers.ValidationError(msg) - raise - - -class EndpointSerializer(serializers.ModelSerializer): - tags = TagListSerializerField(required=False) - active_finding_count = serializers.IntegerField(read_only=True) - - class Meta: - model = Endpoint - exclude = ("inherited_tags",) - - def validate(self, data): - - if self.context["request"].method != "PATCH": - if "product" not in data: - msg = "Product is required" - raise serializers.ValidationError(msg) - protocol = data.get("protocol") - userinfo = data.get("userinfo") - host = data.get("host") - port = data.get("port") - path = data.get("path") - query = data.get("query") - fragment = data.get("fragment") - product = data.get("product") - else: - protocol = data.get("protocol", self.instance.protocol) - userinfo = data.get("userinfo", self.instance.userinfo) - host = data.get("host", self.instance.host) - port = data.get("port", self.instance.port) - path = data.get("path", self.instance.path) - query = data.get("query", self.instance.query) - fragment = data.get("fragment", self.instance.fragment) - if "product" in data and data["product"] != self.instance.product: - msg = "Change of product is not possible" - raise serializers.ValidationError(msg) - product = self.instance.product - - endpoint_ins = Endpoint( - protocol=protocol, - userinfo=userinfo, - host=host, - port=port, - path=path, - query=query, - fragment=fragment, - product=product, - ) - endpoint_ins.clean() # Run standard validation and clean process; can raise errors - - endpoint = endpoint_filter( - protocol=endpoint_ins.protocol, - userinfo=endpoint_ins.userinfo, - host=endpoint_ins.host, - port=endpoint_ins.port, - path=endpoint_ins.path, - query=endpoint_ins.query, - fragment=endpoint_ins.fragment, - product=endpoint_ins.product, - ) - if ( - self.context["request"].method in {"PUT", "PATCH"} - and ( - (endpoint.count() > 1) - or ( - endpoint.count() == 1 - and endpoint.first().pk != self.instance.pk - ) - ) - ) or ( - self.context["request"].method == "POST" and endpoint.count() > 0 - ): - msg = ( - "It appears as though an endpoint with this data already " - "exists for this product." - ) - raise serializers.ValidationError(msg, code="invalid") - - # use clean data - data["protocol"] = endpoint_ins.protocol - data["userinfo"] = endpoint_ins.userinfo - data["host"] = endpoint_ins.host - data["port"] = endpoint_ins.port - data["path"] = endpoint_ins.path - data["query"] = endpoint_ins.query - data["fragment"] = endpoint_ins.fragment - data["product"] = endpoint_ins.product - - return data - - -class EndpointParamsSerializer(serializers.ModelSerializer): - class Meta: - model = Endpoint_Params - fields = "__all__" - - -from dojo.jira.api.serializers import ( # noqa: E402, F401 backward compat +from dojo.endpoint.api.serializer import ( # noqa: E402, F401 -- re-export; prefetcher discovery requires all moved ModelSerializers here + EndpointParamsSerializer, + EndpointSerializer, + EndpointStatusSerializer, +) +from dojo.jira.api.serializers import ( # noqa: E402, F401 -- backward compat re-export JIRAInstanceSerializer, JIRAIssueSerializer, JIRAProjectSerializer, ) +from dojo.tool_config.api.serializer import ToolConfigurationSerializer # noqa: E402, F401 -- re-export +from dojo.tool_product.api.serializer import ToolProductSettingsSerializer # noqa: E402, F401 -- re-export class SonarqubeIssueSerializer(serializers.ModelSerializer): @@ -1219,71 +1047,7 @@ def save(self, *, push_to_jira=False): self.process_scan(auto_create_manager, data, context) -class EndpointMetaImporterSerializer(serializers.Serializer): - file = serializers.FileField(required=True) - create_endpoints = serializers.BooleanField(default=True, required=False) - create_tags = serializers.BooleanField(default=True, required=False) - create_dojo_meta = serializers.BooleanField(default=False, required=False) - product_name = serializers.CharField(required=False) - product = serializers.PrimaryKeyRelatedField( - queryset=Product.objects.all(), required=False, - ) - # extra fields populated in response - # need to use the _id suffix as without the serializer framework gets - # confused - product_id = serializers.IntegerField(read_only=True) - - def validate(self, data): - file = data.get("file") - if file and is_scan_file_too_large(file): - msg = f"Report file is too large. Maximum supported size is {settings.SCAN_FILE_MAX_SIZE} MB" - raise serializers.ValidationError(msg) - - return data - - def save(self): - data = self.validated_data - file = data.get("file") - create_endpoints = data.get("create_endpoints", True) - create_tags = data.get("create_tags", True) - create_dojo_meta = data.get("create_dojo_meta", False) - auto_create = AutoCreateContextManager() - # Process the context to make an conversions needed. Catch any exceptions - # in this case and wrap them in a DRF exception - try: - auto_create.process_import_meta_data_from_dict(data) - # Get an existing product - product = auto_create.get_target_product_if_exists(**data) - if not product: - product = auto_create.get_target_product_by_id_if_exists(**data) - except (ValueError, TypeError) as e: - # Raise an explicit drf exception here - raise ValidationError(str(e)) - try: - if settings.V3_FEATURE_LOCATIONS: - endpoint_meta_import( - file, - product, - create_endpoints, - create_tags, - create_dojo_meta, - origin="API", - object_class=Location, - ) - else: - # TODO: Delete this after the move to Locations - endpoint_meta_import( - file, - product, - create_endpoints, - create_tags, - create_dojo_meta, - origin="API", - ) - except SyntaxError as se: - raise Exception(se) - except ValueError as ve: - raise Exception(ve) +from dojo.endpoint.api.serializer import EndpointMetaImporterSerializer # noqa: E402, F401 -- re-export class LanguageTypeSerializer(serializers.ModelSerializer): diff --git a/dojo/api_v2/views.py b/dojo/api_v2/views.py index 50ddd53a4d5..97816b34f6c 100644 --- a/dojo/api_v2/views.py +++ b/dojo/api_v2/views.py @@ -9,8 +9,6 @@ from django.contrib.auth.models import Permission from django.core.exceptions import ValidationError from django.db import IntegrityError -from django.db.models import OuterRef, Value -from django.db.models.functions import Coalesce from django.db.models.query import QuerySet as DjangoQuerySet from django.http import FileResponse from django.urls import reverse @@ -39,15 +37,10 @@ ) from dojo.authorization import api_permissions as permissions from dojo.authorization.authorization import user_has_permission_or_403 -from dojo.endpoint.queries import ( - get_authorized_endpoint_status, - get_authorized_endpoints, -) -from dojo.endpoint.views import get_endpoint_ids +from dojo.endpoint.ui.views import get_endpoint_ids from dojo.filters import ( ApiAppAnalysisFilter, ApiDojoMetaFilter, - ApiEndpointFilter, ApiRiskAcceptanceFilter, ) from dojo.finding.ui.filters import ( @@ -64,7 +57,6 @@ Dojo_User, DojoMeta, Endpoint, - Endpoint_Status, Finding, Language_Type, Languages, @@ -80,9 +72,6 @@ Sonarqube_Issue_Transition, System_Settings, Test, - Tool_Configuration, - Tool_Product_Settings, - Tool_Type, ) from dojo.product.queries import ( get_authorized_app_analysis, @@ -90,7 +79,6 @@ get_authorized_languages, get_authorized_products, ) -from dojo.query_utils import build_count_subquery from dojo.reports.views import ( prefetch_related_findings_for_report, report_url_resolver, @@ -98,7 +86,6 @@ from dojo.risk_acceptance.helper import remove_finding_from_risk_acceptance from dojo.risk_acceptance.queries import get_authorized_risk_acceptances from dojo.test.queries import get_authorized_tests -from dojo.tool_product.queries import get_authorized_tool_product_settings from dojo.user.utils import get_configuration_permissions_codenames from dojo.utils import ( get_celery_queue_details, @@ -182,104 +169,6 @@ def finalize_response(self, request, response, *args, **kwargs): return super().finalize_response(request, response, *args, **kwargs) -# Authorization: authenticated users -# Authorization: object-based -# @extend_schema_view(**schema_with_prefetch()) -# Nested models with prefetch make the response schema too long for Swagger UI -class EndPointViewSet( - PrefetchDojoModelViewSet, -): - serializer_class = serializers.EndpointSerializer - queryset = Endpoint.objects.none() - filter_backends = (DjangoFilterBackend,) - filterset_class = ApiEndpointFilter - - permission_classes = ( - IsAuthenticated, - permissions.UserHasEndpointPermission, - ) - - def get_queryset(self): - active_finding_subquery = build_count_subquery( - Finding.objects.filter(endpoints=OuterRef("pk"), active=True), - group_field="endpoints", - ) - return get_authorized_endpoints("view").annotate( - active_finding_count=Coalesce(active_finding_subquery, Value(0)), - ).distinct() - - @extend_schema( - request=serializers.ReportGenerateOptionSerializer, - responses={status.HTTP_200_OK: serializers.ReportGenerateSerializer}, - ) - @action( - detail=True, methods=["post"], - # IsAuthenticated only: report generation requires View permission, - # enforced by the permission-filtered get_queryset(). The viewset's - # permission_classes would check Edit (POST), which is too restrictive. - permission_classes=[IsAuthenticated], - ) - def generate_report(self, request, pk=None): - endpoint = self.get_object() - - options = {} - # prepare post data - report_options = serializers.ReportGenerateOptionSerializer( - data=request.data, - ) - if report_options.is_valid(): - options["include_finding_notes"] = report_options.validated_data[ - "include_finding_notes" - ] - options["include_finding_images"] = report_options.validated_data[ - "include_finding_images" - ] - options[ - "include_executive_summary" - ] = report_options.validated_data["include_executive_summary"] - options[ - "include_table_of_contents" - ] = report_options.validated_data["include_table_of_contents"] - else: - return Response( - report_options.errors, status=status.HTTP_400_BAD_REQUEST, - ) - - data = report_generate(request, endpoint, options) - report = serializers.ReportGenerateSerializer(data) - return Response(report.data) - - -# Authorization: object-based -# @extend_schema_view(**schema_with_prefetch()) -# Nested models with prefetch make the response schema too long for Swagger UI -class EndpointStatusViewSet( - PrefetchDojoModelViewSet, -): - serializer_class = serializers.EndpointStatusSerializer - queryset = Endpoint_Status.objects.none() - filter_backends = (DjangoFilterBackend,) - filterset_fields = [ - "mitigated", - "false_positive", - "out_of_scope", - "risk_accepted", - "mitigated_by", - "finding", - "endpoint", - ] - - permission_classes = ( - IsAuthenticated, - permissions.UserHasEndpointStatusPermission, - ) - - def get_queryset(self): - return get_authorized_endpoint_status( - "view", - ).distinct() - - # @extend_schema_view(**schema_with_prefetch()) # Nested models with prefetch make the response schema too long for Swagger UI class RiskAcceptanceViewSet( @@ -574,66 +463,6 @@ def get_queryset(self): return Development_Environment.objects.all().order_by("id") -# Authorization: configurations -@extend_schema_view(**schema_with_prefetch()) -class ToolConfigurationsViewSet( - PrefetchDojoModelViewSet, -): - serializer_class = serializers.ToolConfigurationSerializer - queryset = Tool_Configuration.objects.none() - filter_backends = (DjangoFilterBackend,) - filterset_fields = [ - "id", - "name", - "tool_type", - "url", - "authentication_type", - ] - permission_classes = (permissions.UserHasConfigurationPermissionSuperuser,) - - def get_queryset(self): - return Tool_Configuration.objects.all().order_by("id") - - -# Authorization: object-based -@extend_schema_view(**schema_with_prefetch()) -class ToolProductSettingsViewSet( - PrefetchDojoModelViewSet, -): - serializer_class = serializers.ToolProductSettingsSerializer - queryset = Tool_Product_Settings.objects.none() - filter_backends = (DjangoFilterBackend,) - filterset_fields = [ - "id", - "name", - "product", - "tool_configuration", - "tool_project_id", - "url", - ] - permission_classes = ( - IsAuthenticated, - permissions.UserHasToolProductSettingsPermission, - ) - - def get_queryset(self): - return get_authorized_tool_product_settings("view") - - -# Authorization: configuration -class ToolTypesViewSet( - DojoModelViewSet, -): - serializer_class = serializers.ToolTypeSerializer - queryset = Tool_Type.objects.none() - filter_backends = (DjangoFilterBackend,) - filterset_fields = ["id", "name", "description"] - permission_classes = (permissions.UserHasConfigurationPermissionSuperuser,) - - def get_queryset(self): - return Tool_Type.objects.all().order_by("id") - - # Authorization: authenticated, configuration class RegulationsViewSet( DojoModelViewSet, @@ -720,38 +549,6 @@ def get_queryset(self): return get_authorized_tests("import") -# Authorization: authenticated users, DjangoModelPermissions -class EndpointMetaImporterView( - mixins.CreateModelMixin, viewsets.GenericViewSet, -): - - """ - Imports a CSV file into a product to propagate arbitrary meta and tags on endpoints. - - By Names: - - Provide `product_name` of existing product - - By ID: - - Provide the id of the product in the `product` parameter - - In this scenario Defect Dojo will look up the product by the provided details. - """ - - serializer_class = serializers.EndpointMetaImporterSerializer - parser_classes = [MultiPartParser] - queryset = Product.objects.none() - permission_classes = ( - IsAuthenticated, - permissions.UserHasMetaImportPermission, - ) - - def perform_create(self, serializer): - serializer.save() - - def get_queryset(self): - return get_authorized_products("edit") - - # Authorization: configuration class LanguageTypeViewSet( DojoModelViewSet, diff --git a/dojo/endpoint/__init__.py b/dojo/endpoint/__init__.py index e69de29bb2d..d774cc434b1 100644 --- a/dojo/endpoint/__init__.py +++ b/dojo/endpoint/__init__.py @@ -0,0 +1 @@ +import dojo.endpoint.admin # noqa: F401 diff --git a/dojo/endpoint/admin.py b/dojo/endpoint/admin.py new file mode 100644 index 00000000000..1a56f8d89d8 --- /dev/null +++ b/dojo/endpoint/admin.py @@ -0,0 +1,10 @@ +import tagulous.admin +from django.contrib import admin + +from dojo.endpoint.models import Endpoint, Endpoint_Params, Endpoint_Status + +admin.site.register(Endpoint_Params) +admin.site.register(Endpoint_Status) +admin.site.register(Endpoint) +tagulous.admin.register(Endpoint.tags) +tagulous.admin.register(Endpoint.inherited_tags) diff --git a/dojo/endpoint/api/__init__.py b/dojo/endpoint/api/__init__.py new file mode 100644 index 00000000000..0aa96944499 --- /dev/null +++ b/dojo/endpoint/api/__init__.py @@ -0,0 +1 @@ +path = "endpoints" # noqa: RUF067 diff --git a/dojo/endpoint/api/filters.py b/dojo/endpoint/api/filters.py new file mode 100644 index 00000000000..6dda92ab41a --- /dev/null +++ b/dojo/endpoint/api/filters.py @@ -0,0 +1,40 @@ +from django_filters import ( + BooleanFilter, + CharFilter, + OrderingFilter, +) + +from dojo.endpoint.models import Endpoint +from dojo.filters import CharFieldFilterANDExpression, CharFieldInFilter, DojoFilter + + +class ApiEndpointFilter(DojoFilter): + tag = CharFilter(field_name="tags__name", lookup_expr="icontains", help_text="Tag name contains") + tags = CharFieldInFilter( + field_name="tags__name", + lookup_expr="in", + help_text="Comma separated list of exact tags (uses OR for multiple values)") + tags__and = CharFieldFilterANDExpression( + field_name="tags__name", + help_text="Comma separated list of exact tags to match with an AND expression") + not_tag = CharFilter(field_name="tags__name", lookup_expr="icontains", help_text="Not Tag name contains", exclude="True") + not_tags = CharFieldInFilter(field_name="tags__name", lookup_expr="in", + help_text="Comma separated list of exact tags not present on model", exclude="True") + has_tags = BooleanFilter(field_name="tags", lookup_expr="isnull", exclude=True, label="Has tags") + + o = OrderingFilter( + # tuple-mapping retains order + fields=( + ("host", "host"), + ("product", "product"), + ("id", "id"), + ("active_finding_count", "active_finding_count"), + ), + field_labels={ + "active_finding_count": "Active Findings Count", + }, + ) + + class Meta: + model = Endpoint + fields = ["id", "protocol", "userinfo", "host", "port", "path", "query", "fragment", "product"] diff --git a/dojo/endpoint/api/serializer.py b/dojo/endpoint/api/serializer.py new file mode 100644 index 00000000000..05e140d3f65 --- /dev/null +++ b/dojo/endpoint/api/serializer.py @@ -0,0 +1,230 @@ +import logging + +from django.conf import settings +from django.core.exceptions import ValidationError +from django.db.utils import IntegrityError +from rest_framework import serializers +from rest_framework.exceptions import ValidationError as RestFrameworkValidationError + +from dojo.endpoint.models import Endpoint, Endpoint_Params, Endpoint_Status +from dojo.endpoint.utils import endpoint_filter, endpoint_meta_import +from dojo.importers.auto_create_context import AutoCreateContextManager +from dojo.location.models import Location +from dojo.models import Product, get_current_date +from dojo.utils import is_scan_file_too_large + +logger = logging.getLogger(__name__) + + +class EndpointStatusSerializer(serializers.ModelSerializer): + class Meta: + model = Endpoint_Status + fields = "__all__" + + def run_validators(self, initial_data): + try: + return super().run_validators(initial_data) + except RestFrameworkValidationError as exc: + if "finding, endpoint must make a unique set" in str(exc): + msg = "This endpoint-finding relation already exists" + raise serializers.ValidationError(msg) from exc + raise + + def create(self, validated_data): + endpoint = validated_data.get("endpoint") + finding = validated_data.get("finding") + try: + status = Endpoint_Status.objects.create( + finding=finding, endpoint=endpoint, + ) + except IntegrityError as ie: + if "finding, endpoint must make a unique set" in str(ie): + msg = "This endpoint-finding relation already exists" + raise serializers.ValidationError(msg) + raise + status.mitigated = validated_data.get("mitigated", False) + status.false_positive = validated_data.get("false_positive", False) + status.out_of_scope = validated_data.get("out_of_scope", False) + status.risk_accepted = validated_data.get("risk_accepted", False) + status.date = validated_data.get("date", get_current_date()) + status.save() + return status + + def update(self, instance, validated_data): + try: + return super().update(instance, validated_data) + except IntegrityError as ie: + if "finding, endpoint must make a unique set" in str(ie): + msg = "This endpoint-finding relation already exists" + raise serializers.ValidationError(msg) + raise + + +class EndpointSerializer(serializers.ModelSerializer): + # tags field uses lazy get_fields() to break the import cycle: + # EndpointSerializer -> TagListSerializerField -> api_v2.serializers -> EndpointSerializer + active_finding_count = serializers.IntegerField(read_only=True) + + class Meta: + model = Endpoint + exclude = ("inherited_tags",) + + def get_fields(self): + fields = super().get_fields() + from dojo.api_v2.serializers import ( # noqa: PLC0415 -- lazy import, avoids circular dependency + TagListSerializerField, + ) + fields["tags"] = TagListSerializerField(required=False) + return fields + + def validate(self, data): + + if self.context["request"].method != "PATCH": + if "product" not in data: + msg = "Product is required" + raise serializers.ValidationError(msg) + protocol = data.get("protocol") + userinfo = data.get("userinfo") + host = data.get("host") + port = data.get("port") + path = data.get("path") + query = data.get("query") + fragment = data.get("fragment") + product = data.get("product") + else: + protocol = data.get("protocol", self.instance.protocol) + userinfo = data.get("userinfo", self.instance.userinfo) + host = data.get("host", self.instance.host) + port = data.get("port", self.instance.port) + path = data.get("path", self.instance.path) + query = data.get("query", self.instance.query) + fragment = data.get("fragment", self.instance.fragment) + if "product" in data and data["product"] != self.instance.product: + msg = "Change of product is not possible" + raise serializers.ValidationError(msg) + product = self.instance.product + + endpoint_ins = Endpoint( + protocol=protocol, + userinfo=userinfo, + host=host, + port=port, + path=path, + query=query, + fragment=fragment, + product=product, + ) + endpoint_ins.clean() # Run standard validation and clean process; can raise errors + + endpoint = endpoint_filter( + protocol=endpoint_ins.protocol, + userinfo=endpoint_ins.userinfo, + host=endpoint_ins.host, + port=endpoint_ins.port, + path=endpoint_ins.path, + query=endpoint_ins.query, + fragment=endpoint_ins.fragment, + product=endpoint_ins.product, + ) + if ( + self.context["request"].method in {"PUT", "PATCH"} + and ( + (endpoint.count() > 1) + or ( + endpoint.count() == 1 + and endpoint.first().pk != self.instance.pk + ) + ) + ) or ( + self.context["request"].method == "POST" and endpoint.count() > 0 + ): + msg = ( + "It appears as though an endpoint with this data already " + "exists for this product." + ) + raise serializers.ValidationError(msg, code="invalid") + + # use clean data + data["protocol"] = endpoint_ins.protocol + data["userinfo"] = endpoint_ins.userinfo + data["host"] = endpoint_ins.host + data["port"] = endpoint_ins.port + data["path"] = endpoint_ins.path + data["query"] = endpoint_ins.query + data["fragment"] = endpoint_ins.fragment + data["product"] = endpoint_ins.product + + return data + + +class EndpointParamsSerializer(serializers.ModelSerializer): + class Meta: + model = Endpoint_Params + fields = "__all__" + + +class EndpointMetaImporterSerializer(serializers.Serializer): + file = serializers.FileField(required=True) + create_endpoints = serializers.BooleanField(default=True, required=False) + create_tags = serializers.BooleanField(default=True, required=False) + create_dojo_meta = serializers.BooleanField(default=False, required=False) + product_name = serializers.CharField(required=False) + product = serializers.PrimaryKeyRelatedField( + queryset=Product.objects.all(), required=False, + ) + # extra fields populated in response + # need to use the _id suffix as without the serializer framework gets + # confused + product_id = serializers.IntegerField(read_only=True) + + def validate(self, data): + file = data.get("file") + if file and is_scan_file_too_large(file): + msg = f"Report file is too large. Maximum supported size is {settings.SCAN_FILE_MAX_SIZE} MB" + raise serializers.ValidationError(msg) + + return data + + def save(self): + data = self.validated_data + file = data.get("file") + create_endpoints = data.get("create_endpoints", True) + create_tags = data.get("create_tags", True) + create_dojo_meta = data.get("create_dojo_meta", False) + auto_create = AutoCreateContextManager() + # Process the context to make an conversions needed. Catch any exceptions + # in this case and wrap them in a DRF exception + try: + auto_create.process_import_meta_data_from_dict(data) + # Get an existing product + product = auto_create.get_target_product_if_exists(**data) + if not product: + product = auto_create.get_target_product_by_id_if_exists(**data) + except (ValueError, TypeError) as e: + # Raise an explicit drf exception here + raise ValidationError(str(e)) + try: + if settings.V3_FEATURE_LOCATIONS: + endpoint_meta_import( + file, + product, + create_endpoints, + create_tags, + create_dojo_meta, + origin="API", + object_class=Location, + ) + else: + # TODO: Delete this after the move to Locations + endpoint_meta_import( + file, + product, + create_endpoints, + create_tags, + create_dojo_meta, + origin="API", + ) + except SyntaxError as se: + raise Exception(se) + except ValueError as ve: + raise Exception(ve) diff --git a/dojo/endpoint/api/urls.py b/dojo/endpoint/api/urls.py new file mode 100644 index 00000000000..4a138f3cbde --- /dev/null +++ b/dojo/endpoint/api/urls.py @@ -0,0 +1,20 @@ +from dojo.endpoint.api.views import EndpointMetaImporterView, EndpointStatusViewSet, EndPointViewSet + + +def add_endpoint_urls(router): + """ + Register endpoint/endpoint_status routes (non-V3 block only). + + endpoint_meta_import is always registered via register_endpoint_meta_import. + endpoints and endpoint_status are registered only when V3_FEATURE_LOCATIONS is OFF; + the V3 compat viewsets are registered by dojo/location/api/urls.py instead. + """ + router.register(r"endpoints", EndPointViewSet, basename="endpoint") + router.register(r"endpoint_status", EndpointStatusViewSet, basename="endpoint_status") + return router + + +def register_endpoint_meta_import(router): + """Register the unconditional endpoint_meta_import route.""" + router.register(r"endpoint_meta_import", EndpointMetaImporterView, basename="endpointmetaimport") + return router diff --git a/dojo/endpoint/api/views.py b/dojo/endpoint/api/views.py new file mode 100644 index 00000000000..02a6f65f31f --- /dev/null +++ b/dojo/endpoint/api/views.py @@ -0,0 +1,161 @@ +import logging + +from django.db.models import OuterRef, Value +from django.db.models.functions import Coalesce +from django_filters.rest_framework import DjangoFilterBackend +from drf_spectacular.utils import extend_schema +from rest_framework import mixins, status, viewsets +from rest_framework.decorators import action +from rest_framework.parsers import MultiPartParser +from rest_framework.permissions import IsAuthenticated +from rest_framework.response import Response + +from dojo.api_v2 import serializers as api_v2_serializers +from dojo.api_v2.views import PrefetchDojoModelViewSet, report_generate +from dojo.authorization import api_permissions as permissions +from dojo.endpoint.api.filters import ApiEndpointFilter +from dojo.endpoint.api.serializer import ( + EndpointMetaImporterSerializer, + EndpointSerializer, + EndpointStatusSerializer, +) +from dojo.endpoint.models import Endpoint, Endpoint_Status +from dojo.endpoint.queries import ( + get_authorized_endpoint_status, + get_authorized_endpoints, +) +from dojo.models import Finding +from dojo.product.queries import get_authorized_products +from dojo.query_utils import build_count_subquery + +logger = logging.getLogger(__name__) + + +# Authorization: authenticated users +# Authorization: object-based +# @extend_schema_view(**schema_with_prefetch()) +# Nested models with prefetch make the response schema too long for Swagger UI +class EndPointViewSet( + PrefetchDojoModelViewSet, +): + serializer_class = EndpointSerializer + queryset = Endpoint.objects.none() + filter_backends = (DjangoFilterBackend,) + filterset_class = ApiEndpointFilter + + permission_classes = ( + IsAuthenticated, + permissions.UserHasEndpointPermission, + ) + + def get_queryset(self): + active_finding_subquery = build_count_subquery( + Finding.objects.filter(endpoints=OuterRef("pk"), active=True), + group_field="endpoints", + ) + return get_authorized_endpoints("view").annotate( + active_finding_count=Coalesce(active_finding_subquery, Value(0)), + ).distinct() + + @extend_schema( + request=api_v2_serializers.ReportGenerateOptionSerializer, + responses={status.HTTP_200_OK: api_v2_serializers.ReportGenerateSerializer}, + ) + @action( + detail=True, methods=["post"], + # IsAuthenticated only: report generation requires View permission, + # enforced by the permission-filtered get_queryset(). The viewset's + # permission_classes would check Edit (POST), which is too restrictive. + permission_classes=[IsAuthenticated], + ) + def generate_report(self, request, pk=None): + endpoint = self.get_object() + + options = {} + # prepare post data + report_options = api_v2_serializers.ReportGenerateOptionSerializer( + data=request.data, + ) + if report_options.is_valid(): + options["include_finding_notes"] = report_options.validated_data[ + "include_finding_notes" + ] + options["include_finding_images"] = report_options.validated_data[ + "include_finding_images" + ] + options[ + "include_executive_summary" + ] = report_options.validated_data["include_executive_summary"] + options[ + "include_table_of_contents" + ] = report_options.validated_data["include_table_of_contents"] + else: + return Response( + report_options.errors, status=status.HTTP_400_BAD_REQUEST, + ) + + data = report_generate(request, endpoint, options) + report = api_v2_serializers.ReportGenerateSerializer(data) + return Response(report.data) + + +# Authorization: object-based +# @extend_schema_view(**schema_with_prefetch()) +# Nested models with prefetch make the response schema too long for Swagger UI +class EndpointStatusViewSet( + PrefetchDojoModelViewSet, +): + serializer_class = EndpointStatusSerializer + queryset = Endpoint_Status.objects.none() + filter_backends = (DjangoFilterBackend,) + filterset_fields = [ + "mitigated", + "false_positive", + "out_of_scope", + "risk_accepted", + "mitigated_by", + "finding", + "endpoint", + ] + + permission_classes = ( + IsAuthenticated, + permissions.UserHasEndpointStatusPermission, + ) + + def get_queryset(self): + return get_authorized_endpoint_status( + "view", + ).distinct() + + +# Authorization: authenticated users, DjangoModelPermissions +class EndpointMetaImporterView( + mixins.CreateModelMixin, viewsets.GenericViewSet, +): + + """ + Imports a CSV file into a product to propagate arbitrary meta and tags on endpoints. + + By Names: + - Provide `product_name` of existing product + + By ID: + - Provide the id of the product in the `product` parameter + + In this scenario Defect Dojo will look up the product by the provided details. + """ + + serializer_class = EndpointMetaImporterSerializer + parser_classes = [MultiPartParser] + queryset = Finding.objects.none() + permission_classes = ( + IsAuthenticated, + permissions.UserHasMetaImportPermission, + ) + + def perform_create(self, serializer): + serializer.save() + + def get_queryset(self): + return get_authorized_products("edit") diff --git a/dojo/endpoint/models.py b/dojo/endpoint/models.py new file mode 100644 index 00000000000..f55e3f5f8c0 --- /dev/null +++ b/dojo/endpoint/models.py @@ -0,0 +1,464 @@ +import contextlib +import logging +import re +from urllib.parse import urlparse + +import hyperlink +from django.conf import settings +from django.core.exceptions import ValidationError +from django.core.validators import validate_ipv46_address +from django.db import connection, models +from django.db.models import F, Q +from django.db.models.functions import Lower +from django.urls import reverse +from django.utils.translation import gettext as _ +from tagulous.models import TagField + +# get_current_date/get_current_datetime/copy_model_util are defined early in dojo.models, +# before the re-export that loads this module — resolves despite partial circular load. +# Must keep their dojo.models.* path for Django migration serialization. +from dojo.models import copy_model_util, get_current_date, get_current_datetime + +logger = logging.getLogger(__name__) + + +class Endpoint_Params(models.Model): + param = models.CharField(max_length=150) + value = models.CharField(max_length=150) + method_type = (("GET", "GET"), + ("POST", "POST")) + method = models.CharField(max_length=20, blank=False, null=True, choices=method_type) + + +class Endpoint_Status(models.Model): + date = models.DateField(default=get_current_date) + last_modified = models.DateTimeField(null=True, editable=False, default=get_current_datetime) + mitigated = models.BooleanField(default=False, blank=True) + mitigated_time = models.DateTimeField(editable=False, null=True, blank=True) + mitigated_by = models.ForeignKey("dojo.Dojo_User", editable=True, null=True, on_delete=models.RESTRICT) + false_positive = models.BooleanField(default=False, blank=True) + out_of_scope = models.BooleanField(default=False, blank=True) + risk_accepted = models.BooleanField(default=False, blank=True) + endpoint = models.ForeignKey("dojo.Endpoint", null=False, blank=False, on_delete=models.CASCADE, related_name="status_endpoint") + finding = models.ForeignKey("dojo.Finding", null=False, blank=False, on_delete=models.CASCADE, related_name="status_finding") + + class Meta: + indexes = [ + models.Index(fields=["finding", "mitigated"]), + models.Index(fields=["endpoint", "mitigated"]), + # Optimize frequent lookups of "active" statuses (mitigated/flags all False) + models.Index( + name="idx_eps_active_by_endpoint", + fields=["endpoint"], + condition=Q(mitigated=False, false_positive=False, out_of_scope=False, risk_accepted=False), + ), + models.Index( + name="idx_eps_active_by_finding", + fields=["finding"], + condition=Q(mitigated=False, false_positive=False, out_of_scope=False, risk_accepted=False), + ), + ] + constraints = [ + models.UniqueConstraint(fields=["finding", "endpoint"], name="endpoint-finding relation"), + ] + + def __str__(self): + with Endpoint.allow_endpoint_init(): # TODO: Delete this after the move to Locations + return f"'{self.finding}' on '{self.endpoint}'" + + def copy(self, finding=None): + copy = copy_model_util(self) + current_endpoint = self.endpoint + if finding: + copy.finding = finding + copy.endpoint = current_endpoint + copy.save() + + return copy + + @property + def age(self): + + diff = self.mitigated_time.date() - self.date if self.mitigated else get_current_date() - self.date + days = diff.days + return max(0, days) + + +class Endpoint(models.Model): + protocol = models.CharField(null=True, blank=True, max_length=20, + help_text=_("The communication protocol/scheme such as 'http', 'ftp', 'dns', etc.")) + userinfo = models.CharField(null=True, blank=True, max_length=500, + help_text=_("User info as 'alice', 'bob', etc.")) + host = models.CharField(null=True, blank=True, max_length=500, + help_text=_("The host name or IP address. It must not include the port number. " + "For example '127.0.0.1', 'localhost', 'yourdomain.com'.")) + port = models.IntegerField(null=True, blank=True, + help_text=_("The network port associated with the endpoint.")) + path = models.CharField(null=True, blank=True, max_length=500, + help_text=_("The location of the resource, it must not start with a '/'. For example " + "endpoint/420/edit")) + query = models.CharField(null=True, blank=True, max_length=1000, + help_text=_("The query string, the question mark should be omitted." + "For example 'group=4&team=8'")) + fragment = models.CharField(null=True, blank=True, max_length=500, + help_text=_("The fragment identifier which follows the hash mark. The hash mark should " + "be omitted. For example 'section-13', 'paragraph-2'.")) + product = models.ForeignKey("dojo.Product", null=True, blank=True, on_delete=models.CASCADE) + endpoint_params = models.ManyToManyField("dojo.Endpoint_Params", blank=True, editable=False) + findings = models.ManyToManyField("dojo.Finding", + blank=True, + verbose_name=_("Findings"), + through="dojo.Endpoint_Status") + + tags = TagField(blank=True, force_lowercase=True, help_text=_("Add tags that help describe this endpoint. Choose from the list or add new tags. Press Enter key to add.")) + inherited_tags = TagField(blank=True, force_lowercase=True, help_text=_("Internal use tags sepcifically for maintaining parity with product. This field will be present as a subset in the tags field")) + + class Meta: + ordering = ["product", "host", "protocol", "port", "userinfo", "path", "query", "fragment"] + indexes = [ + models.Index(fields=["product"]), + # Fast case-insensitive equality on host within product scope + models.Index( + F("product"), + Lower("host"), + name="idx_ep_product_lower_host", + ), + ] + + def __init__(self, *args, **kwargs): + if settings.V3_FEATURE_LOCATIONS and not getattr(self, "_allow_v3_init", False): + msg = "Endpoint model is deprecated when V3_FEATURE_LOCATIONS is enabled" + raise NotImplementedError(msg) + super().__init__(*args, **kwargs) + + def __hash__(self): + return self.__str__().__hash__() + + def __eq__(self, other): + if isinstance(other, Endpoint): + contents_match = str(self) == str(other) + # Use product_id (cached integer) instead of self.product to avoid + # triggering a FK lookup on every comparison inside NestedObjects.add_edge. + if self.product_id is not None and other.product_id is not None: + return self.product_id == other.product_id and contents_match + return contents_match + + return NotImplemented + + def __str__(self): + try: + if self.host: + dummy_scheme = "dummy-scheme" # workaround for https://github.com/python-hyper/hyperlink/blob/b8c9152cd826bbe8e6cc125648f3738235019705/src/hyperlink/_url.py#L988 + url = hyperlink.EncodedURL( + scheme=self.protocol or dummy_scheme, + userinfo=self.userinfo or "", + host=self.host, + port=self.port, + path=tuple(self.path.split("/")) if self.path else (), + query=tuple( + ( + qe.split("=", 1) + if "=" in qe + else (qe, None) + ) + for qe in self.query.split("&") + ) if self.query else (), # inspired by https://github.com/python-hyper/hyperlink/blob/b8c9152cd826bbe8e6cc125648f3738235019705/src/hyperlink/_url.py#L1427 + fragment=self.fragment or "", + ) + # Return a normalized version of the URL to avoid differences where there shouldn't be any difference. + # Example: https://google.com and https://google.com:443 + normalize_path = self.path # it used to add '/' at the end of host + clean_url = url.normalize(scheme=True, host=True, path=normalize_path, query=True, fragment=True, userinfo=True, percents=True).to_uri().to_text() + if not self.protocol: + if clean_url[:len(dummy_scheme) + 3] == (dummy_scheme + "://"): + clean_url = clean_url[len(dummy_scheme) + 3:] + else: + msg = "hyperlink lib did not create URL as was expected" + raise ValueError(msg) + return clean_url + msg = "Missing host" + raise ValueError(msg) + except: + url = "" + if self.protocol: + url += f"{self.protocol}://" + if self.userinfo: + url += f"{self.userinfo}@" + if self.host: + url += self.host + if self.port: + url += f":{self.port}" + if self.path: + url += "{}{}".format("/" if self.path[0] != "/" else "", self.path) + if self.query: + url += f"?{self.query}" + if self.fragment: + url += f"#{self.fragment}" + return url + + def get_absolute_url(self): + return reverse("view_endpoint", args=[str(self.id)]) + + @classmethod + @contextlib.contextmanager + def allow_endpoint_init(cls): + # When migrating to Locations, Endpoints are not deleted (hooray backup!). Disallowing the initialization of + # Endpoints is a good way to catch where they might still be used (oops!). However, there are some circumstances + # -- object deletes -- where Django itself attempts to instantiate an Endpoint object. This, we need to allow: + # if a user wants to delete an object, including whatever Endpoints are attached to it, they should be able to. + # This context manager allows code to initialize Endpoints at our discretion. + old = getattr(cls, "_allow_v3_init", None) + cls._allow_v3_init = True + try: + yield + finally: + cls._allow_v3_init = old + + def clean(self): + errors = [] + null_char_list = ["0x00", "\x00"] + db_type = connection.vendor + if self.protocol is not None: + if not re.match(r"^[A-Za-z][A-Za-z0-9\.\-\+]+$", self.protocol): # https://tools.ietf.org/html/rfc3986#section-3.1 + errors.append(ValidationError(f'Protocol "{self.protocol}" has invalid format')) + if not self.protocol: + self.protocol = None + + if self.userinfo is not None: + if not re.match(r"^[A-Za-z0-9\.\-_~%\!\$&\'\(\)\*\+,;=:]+$", self.userinfo): # https://tools.ietf.org/html/rfc3986#section-3.2.1 + errors.append(ValidationError(f'Userinfo "{self.userinfo}" has invalid format')) + if not self.userinfo: + self.userinfo = None + + if self.host: + if not re.match(r"^[A-Za-z0-9_\-\+][A-Za-z0-9_\.\-\+]+$", self.host): + try: + validate_ipv46_address(self.host) + except ValidationError: + errors.append(ValidationError(f'Host "{self.host}" has invalid format')) + else: + errors.append(ValidationError("Host must not be empty")) + + if self.port is not None: + try: + int_port = int(self.port) + if not (0 <= int_port < 65536): + errors.append(ValidationError(f'Port "{self.port}" has invalid format - out of range')) + self.port = int_port + except ValueError: + errors.append(ValidationError(f'Port "{self.port}" has invalid format - it is not a number')) + + if self.path is not None: + while len(self.path) > 0 and self.path[0] == "/": # Endpoint store "root-less" path + self.path = self.path[1:] + if any(null_char in self.path for null_char in null_char_list): + old_value = self.path + if "postgres" in db_type: + action_string = "Postgres does not accept NULL character. Attempting to replace with %00..." + for remove_str in null_char_list: + self.path = self.path.replace(remove_str, "%00") + logger.error('Path "%s" has invalid format - It contains the NULL character. The following action was taken: %s', old_value, action_string) + if not self.path: + self.path = None + + if self.query is not None: + if len(self.query) > 0 and self.query[0] == "?": + self.query = self.query[1:] + if any(null_char in self.query for null_char in null_char_list): + old_value = self.query + if "postgres" in db_type: + action_string = "Postgres does not accept NULL character. Attempting to replace with %00..." + for remove_str in null_char_list: + self.query = self.query.replace(remove_str, "%00") + logger.error('Query "%s" has invalid format - It contains the NULL character. The following action was taken: %s', old_value, action_string) + if not self.query: + self.query = None + + if self.fragment is not None: + if len(self.fragment) > 0 and self.fragment[0] == "#": + self.fragment = self.fragment[1:] + if any(null_char in self.fragment for null_char in null_char_list): + old_value = self.fragment + if "postgres" in db_type: + action_string = "Postgres does not accept NULL character. Attempting to replace with %00..." + for remove_str in null_char_list: + self.fragment = self.fragment.replace(remove_str, "%00") + logger.error('Fragment "%s" has invalid format - It contains the NULL character. The following action was taken: %s', old_value, action_string) + if not self.fragment: + self.fragment = None + + if errors: + raise ValidationError(errors) + + @property + def is_broken(self): + try: + self.clean() + except: + return True + else: + return not self.product + + @property + def mitigated(self): + return not self.vulnerable + + @property + def vulnerable(self): + return Endpoint_Status.objects.filter( + endpoint=self, + mitigated=False, + false_positive=False, + out_of_scope=False, + risk_accepted=False, + ).count() > 0 + + @property + def findings_count(self): + return self.findings.all().count() + + def active_findings(self): + return self.findings.filter( + active=True, + out_of_scope=False, + mitigated__isnull=True, + false_p=False, + duplicate=False, + status_finding__false_positive=False, + status_finding__out_of_scope=False, + status_finding__risk_accepted=False, + ).order_by("numerical_severity") + + def active_verified_findings(self): + return self.findings.filter( + active=True, + verified=True, + out_of_scope=False, + mitigated__isnull=True, + false_p=False, + duplicate=False, + status_finding__false_positive=False, + status_finding__out_of_scope=False, + status_finding__risk_accepted=False, + ).order_by("numerical_severity") + + @property + def active_findings_count(self): + return self.active_findings().count() + + @property + def active_verified_findings_count(self): + return self.active_verified_findings().count() + + def host_endpoints(self): + return Endpoint.objects.filter(host=self.host, + product=self.product).distinct() + + @property + def host_endpoints_count(self): + return self.host_endpoints().count() + + def host_mitigated_endpoints(self): + meps = Endpoint_Status.objects \ + .filter(endpoint__in=self.host_endpoints()) \ + .filter(Q(mitigated=True) + | Q(false_positive=True) + | Q(out_of_scope=True) + | Q(risk_accepted=True) + | Q(finding__out_of_scope=True) + | Q(finding__mitigated__isnull=False) + | Q(finding__false_p=True) + | Q(finding__duplicate=True) + | Q(finding__active=False)) + return Endpoint.objects.filter(status_endpoint__in=meps).distinct() + + @property + def host_mitigated_endpoints_count(self): + return self.host_mitigated_endpoints().count() + + def host_findings(self): + from dojo.models import Finding # noqa: PLC0415 -- lazy import, avoids circular dependency + return Finding.objects.filter(endpoints__in=self.host_endpoints()).distinct() + + @property + def host_findings_count(self): + return self.host_findings().count() + + def host_active_findings(self): + from dojo.models import Finding # noqa: PLC0415 -- lazy import, avoids circular dependency + return Finding.objects.filter( + active=True, + out_of_scope=False, + mitigated__isnull=True, + false_p=False, + duplicate=False, + status_finding__false_positive=False, + status_finding__out_of_scope=False, + status_finding__risk_accepted=False, + endpoints__in=self.host_endpoints(), + ).order_by("numerical_severity") + + def host_active_verified_findings(self): + from dojo.models import Finding # noqa: PLC0415 -- lazy import, avoids circular dependency + return Finding.objects.filter( + active=True, + verified=True, + out_of_scope=False, + mitigated__isnull=True, + false_p=False, + duplicate=False, + status_finding__false_positive=False, + status_finding__out_of_scope=False, + status_finding__risk_accepted=False, + endpoints__in=self.host_endpoints(), + ).order_by("numerical_severity") + + @property + def host_active_findings_count(self): + return self.host_active_findings().count() + + @property + def host_active_verified_findings_count(self): + return self.host_active_verified_findings().count() + + def get_breadcrumbs(self): + bc = self.product.get_breadcrumbs() + bc += [{"title": self.host, + "url": reverse("view_endpoint", args=(self.id,))}] + return bc + + @staticmethod + def from_uri(uri): + try: + url = hyperlink.parse(url=uri) + except UnicodeDecodeError: + url = hyperlink.parse(url="//" + urlparse(uri).netloc) + except hyperlink.URLParseError as e: + msg = f"Invalid URL format: {e}" + raise ValidationError(msg) + + query_parts = [] # inspired by https://github.com/python-hyper/hyperlink/blob/b8c9152cd826bbe8e6cc125648f3738235019705/src/hyperlink/_url.py#L1768 + for k, v in url.query: + if v is None: + query_parts.append(k) + else: + query_parts.append(f"{k}={v}") + query_string = "&".join(query_parts) + + protocol = url.scheme or None + userinfo = ":".join(url.userinfo) if url.userinfo not in {(), ("",)} else None + host = url.host or None + port = url.port + path = "/".join(url.path)[:500] if url.path not in {None, (), ("",)} else None + query = query_string[:1000] if query_string is not None and query_string else None + fragment = url.fragment[:500] if url.fragment is not None and url.fragment else None + + return Endpoint( + protocol=protocol, + userinfo=userinfo, + host=host, + port=port, + path=path, + query=query, + fragment=fragment, + ) diff --git a/dojo/endpoint/signals.py b/dojo/endpoint/signals.py index aebc348c003..58f5e686d15 100644 --- a/dojo/endpoint/signals.py +++ b/dojo/endpoint/signals.py @@ -7,7 +7,7 @@ from django.urls import reverse from django.utils.translation import gettext as _ -from dojo.models import Endpoint +from dojo.endpoint.models import Endpoint from dojo.notifications.helper import create_notification from dojo.pghistory_models import DojoEvents diff --git a/dojo/endpoint/ui/__init__.py b/dojo/endpoint/ui/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dojo/endpoint/ui/filters.py b/dojo/endpoint/ui/filters.py new file mode 100644 index 00000000000..a4ccb35cce1 --- /dev/null +++ b/dojo/endpoint/ui/filters.py @@ -0,0 +1,260 @@ +from django.forms import HiddenInput +from django_filters import ( + CharFilter, + FilterSet, + ModelMultipleChoiceFilter, + NumberFilter, + OrderingFilter, +) + +from dojo.endpoint.models import Endpoint +from dojo.endpoint.queries import get_authorized_endpoints_for_queryset +from dojo.filters import DojoFilter +from dojo.labels import get_labels +from dojo.models import Engagement, Finding, Product, Test +from dojo.product.queries import get_authorized_products + +labels = get_labels() + + +class EndpointFilterHelper(FilterSet): + protocol = CharFilter(lookup_expr="icontains") + userinfo = CharFilter(lookup_expr="icontains") + host = CharFilter(lookup_expr="icontains") + port = NumberFilter() + path = CharFilter(lookup_expr="icontains") + query = CharFilter(lookup_expr="icontains") + fragment = CharFilter(lookup_expr="icontains") + tag = CharFilter(field_name="tags__name", lookup_expr="icontains", label="Tag name contains") + not_tag = CharFilter(field_name="tags__name", lookup_expr="icontains", label="Not tag name contains", exclude=True) + has_tags = CharFilter(field_name="tags", lookup_expr="isnull", exclude=True, label="Has tags") + o = OrderingFilter( + # tuple-mapping retains order + fields=( + ("product", "product"), + ("host", "host"), + ("id", "id"), + ("active_finding_count", "active_finding_count"), + ), + field_labels={ + "active_finding_count": "Active Findings Count", + }, + ) + + +class EndpointFilter(EndpointFilterHelper, DojoFilter): + product = ModelMultipleChoiceFilter( + queryset=Product.objects.none(), + label=labels.ASSET_FILTERS_LABEL) + tags = ModelMultipleChoiceFilter( + field_name="tags__name", + to_field_name="name", + label="Endpoint Tags", + queryset=Endpoint.tags.tag_model.objects.all().order_by("name")) + findings__tags = ModelMultipleChoiceFilter( + field_name="findings__tags__name", + to_field_name="name", + label="Finding Tags", + queryset=Finding.tags.tag_model.objects.all().order_by("name")) + findings__test__tags = ModelMultipleChoiceFilter( + field_name="findings__test__tags__name", + to_field_name="name", + label="Test Tags", + queryset=Test.tags.tag_model.objects.all().order_by("name")) + findings__test__engagement__tags = ModelMultipleChoiceFilter( + field_name="findings__test__engagement__tags__name", + to_field_name="name", + label="Engagement Tags", + queryset=Engagement.tags.tag_model.objects.all().order_by("name")) + findings__test__engagement__product__tags = ModelMultipleChoiceFilter( + field_name="findings__test__engagement__product__tags__name", + to_field_name="name", + label=labels.ASSET_FILTERS_TAGS_ASSET_LABEL, + queryset=Product.tags.tag_model.objects.all().order_by("name")) + not_tags = ModelMultipleChoiceFilter( + field_name="tags__name", + to_field_name="name", + label="Not Endpoint Tags", + exclude=True, + queryset=Endpoint.tags.tag_model.objects.all().order_by("name")) + not_findings__tags = ModelMultipleChoiceFilter( + field_name="findings__tags__name", + to_field_name="name", + label="Not Finding Tags", + exclude=True, + queryset=Finding.tags.tag_model.objects.all().order_by("name")) + not_findings__test__tags = ModelMultipleChoiceFilter( + field_name="findings__test__tags__name", + to_field_name="name", + label="Not Test Tags", + exclude=True, + queryset=Test.tags.tag_model.objects.all().order_by("name")) + not_findings__test__engagement__tags = ModelMultipleChoiceFilter( + field_name="findings__test__engagement__tags__name", + to_field_name="name", + label="Not Engagement Tags", + exclude=True, + queryset=Engagement.tags.tag_model.objects.all().order_by("name")) + not_findings__test__engagement__product__tags = ModelMultipleChoiceFilter( + field_name="findings__test__engagement__product__tags__name", + to_field_name="name", + label=labels.ASSET_FILTERS_NOT_TAGS_ASSET_LABEL, + exclude=True, + queryset=Product.tags.tag_model.objects.all().order_by("name")) + + def __init__(self, *args, **kwargs): + self.user = None + if "user" in kwargs: + self.user = kwargs.pop("user") + super().__init__(*args, **kwargs) + self.form.fields["product"].queryset = get_authorized_products("view") + + @property + def qs(self): + parent = super().qs + return get_authorized_endpoints_for_queryset("view", parent) + + class Meta: + model = Endpoint + exclude = ["findings", "inherited_tags"] + + +class EndpointFilterWithoutObjectLookups(EndpointFilterHelper): + product = NumberFilter(widget=HiddenInput()) + product__name = CharFilter( + field_name="product__name", + lookup_expr="iexact", + label=labels.ASSET_FILTERS_NAME_LABEL, + help_text=labels.ASSET_FILTERS_NAME_HELP) + product__name_contains = CharFilter( + field_name="product__name", + lookup_expr="icontains", + label=labels.ASSET_FILTERS_NAME_CONTAINS_LABEL, + help_text=labels.ASSET_FILTERS_NAME_CONTAINS_HELP) + + tags_contains = CharFilter( + label="Endpoint Tag Contains", + field_name="tags__name", + lookup_expr="icontains", + help_text="Search for tags on a Endpoint that contain a given pattern") + tags = CharFilter( + label="Endpoint Tag", + field_name="tags__name", + lookup_expr="iexact", + help_text="Search for tags on a Endpoint that are an exact match") + findings__tags_contains = CharFilter( + label="Finding Tag Contains", + field_name="findings__tags__name", + lookup_expr="icontains", + help_text="Search for tags on a Finding that contain a given pattern") + findings__tags = CharFilter( + label="Finding Tag", + field_name="findings__tags__name", + lookup_expr="iexact", + help_text="Search for tags on a Finding that are an exact match") + findings__test__tags_contains = CharFilter( + label="Test Tag Contains", + field_name="findings__test__tags__name", + lookup_expr="icontains", + help_text="Search for tags on a Finding that contain a given pattern") + findings__test__tags = CharFilter( + label="Test Tag", + field_name="findings__test__tags__name", + lookup_expr="iexact", + help_text="Search for tags on a Finding that are an exact match") + findings__test__engagement__tags_contains = CharFilter( + label="Engagement Tag Contains", + field_name="findings__test__engagement__tags__name", + lookup_expr="icontains", + help_text="Search for tags on a Finding that contain a given pattern") + findings__test__engagement__tags = CharFilter( + label="Engagement Tag", + field_name="findings__test__engagement__tags__name", + lookup_expr="iexact", + help_text="Search for tags on a Finding that are an exact match") + findings__test__engagement__product__tags_contains = CharFilter( + label=labels.ASSET_FILTERS_TAG_ASSET_CONTAINS_LABEL, + field_name="findings__test__engagement__product__tags__name", + lookup_expr="icontains", + help_text=labels.ASSET_FILTERS_TAG_ASSET_CONTAINS_HELP) + findings__test__engagement__product__tags = CharFilter( + label=labels.ASSET_FILTERS_TAG_ASSET_LABEL, + field_name="findings__test__engagement__product__tags__name", + lookup_expr="iexact", + help_text=labels.ASSET_FILTERS_TAG_ASSET_HELP) + + not_tags_contains = CharFilter( + label="Endpoint Tag Does Not Contain", + field_name="tags__name", + lookup_expr="icontains", + help_text="Search for tags on a Endpoint that contain a given pattern, and exclude them", + exclude=True) + not_tags = CharFilter( + label="Not Endpoint Tag", + field_name="tags__name", + lookup_expr="iexact", + help_text="Search for tags on a Endpoint that are an exact match, and exclude them", + exclude=True) + not_findings__tags_contains = CharFilter( + label="Finding Tag Does Not Contain", + field_name="findings__tags__name", + lookup_expr="icontains", + help_text="Search for tags on a Finding that contain a given pattern, and exclude them", + exclude=True) + not_findings__tags = CharFilter( + label="Not Finding Tag", + field_name="findings__tags__name", + lookup_expr="iexact", + help_text="Search for tags on a Finding that are an exact match, and exclude them", + exclude=True) + not_findings__test__tags_contains = CharFilter( + label="Test Tag Does Not Contain", + field_name="findings__test__tags__name", + lookup_expr="icontains", + help_text="Search for tags on a Test that contain a given pattern, and exclude them", + exclude=True) + not_findings__test__tags = CharFilter( + label="Not Test Tag", + field_name="findings__test__tags__name", + lookup_expr="iexact", + help_text="Search for tags on a Test that are an exact match, and exclude them", + exclude=True) + not_findings__test__engagement__tags_contains = CharFilter( + label="Engagement Tag Does Not Contain", + field_name="findings__test__engagement__tags__name", + lookup_expr="icontains", + help_text="Search for tags on a Engagement that contain a given pattern, and exclude them", + exclude=True) + not_findings__test__engagement__tags = CharFilter( + label="Not Engagement Tag", + field_name="findings__test__engagement__tags__name", + lookup_expr="iexact", + help_text="Search for tags on a Engagement that are an exact match, and exclude them", + exclude=True) + not_findings__test__engagement__product__tags_contains = CharFilter( + label=labels.ASSET_FILTERS_TAG_NOT_CONTAIN_LABEL, + field_name="findings__test__engagement__product__tags__name", + lookup_expr="icontains", + help_text=labels.ASSET_FILTERS_TAG_NOT_CONTAIN_HELP, + exclude=True) + not_findings__test__engagement__product__tags = CharFilter( + label=labels.ASSET_FILTERS_TAG_NOT_LABEL, + field_name="findings__test__engagement__product__tags__name", + lookup_expr="iexact", + help_text=labels.ASSET_FILTERS_TAG_NOT_HELP, + exclude=True) + + def __init__(self, *args, **kwargs): + self.user = None + if "user" in kwargs: + self.user = kwargs.pop("user") + super().__init__(*args, **kwargs) + + @property + def qs(self): + parent = super().qs + return get_authorized_endpoints_for_queryset("view", parent) + + class Meta: + model = Endpoint + exclude = ["findings", "inherited_tags", "product"] diff --git a/dojo/endpoint/ui/forms.py b/dojo/endpoint/ui/forms.py new file mode 100644 index 00000000000..625cfc09e41 --- /dev/null +++ b/dojo/endpoint/ui/forms.py @@ -0,0 +1,164 @@ +from django import forms +from tagulous.forms import TagField + +from dojo.endpoint.models import Endpoint +from dojo.endpoint.utils import endpoint_filter, endpoint_get_or_create, validate_endpoints_to_add +from dojo.labels import get_labels +from dojo.models import Finding, Product +from dojo.product.queries import get_authorized_products +from dojo.validators import tag_validator + +labels = get_labels() + + +class ImportEndpointMetaForm(forms.Form): + file = forms.FileField(widget=forms.widgets.FileInput( + attrs={"accept": ".csv"}), + label="Choose meta file", + required=True) # Could not get required=True to actually accept the file as present + create_endpoints = forms.BooleanField( + label="Create nonexisting Endpoint", + initial=True, + required=False, + help_text="Create endpoints that do not already exist") + create_tags = forms.BooleanField( + label="Add Tags", + initial=True, + required=False, + help_text="Add meta from file as tags in the format key:value") + create_dojo_meta = forms.BooleanField( + label="Add Meta", + initial=False, + required=False, + help_text="Add data from file as Metadata. Metadata is used for displaying custom fields") + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + +class EditEndpointForm(forms.ModelForm): + class Meta: + model = Endpoint + exclude = ["product", "inherited_tags"] + + def __init__(self, *args, **kwargs): + self.product = None + self.endpoint_instance = None + super().__init__(*args, **kwargs) + if "instance" in kwargs: + self.endpoint_instance = kwargs.pop("instance") + self.product = self.endpoint_instance.product + product_id = self.endpoint_instance.product.pk + findings = Finding.objects.filter(test__engagement__product__id=product_id) + self.fields["findings"].queryset = findings + + def clean(self): + + cleaned_data = super().clean() + + protocol = cleaned_data["protocol"] + userinfo = cleaned_data["userinfo"] + host = cleaned_data["host"] + port = cleaned_data["port"] + path = cleaned_data["path"] + query = cleaned_data["query"] + fragment = cleaned_data["fragment"] + + endpoint = endpoint_filter( + protocol=protocol, + userinfo=userinfo, + host=host, + port=port, + path=path, + query=query, + fragment=fragment, + product=self.product, + ) + if endpoint.count() > 1 or (endpoint.count() == 1 and endpoint.first().pk != self.endpoint_instance.pk): + msg = "It appears as though an endpoint with this data already exists for this product." + raise forms.ValidationError(msg, code="invalid") + + return cleaned_data + + def clean_tags(self): + tag_validator(self.cleaned_data.get("tags")) + return self.cleaned_data.get("tags") + + +class AddEndpointForm(forms.Form): + endpoint = forms.CharField(max_length=5000, required=True, label="Endpoint(s)", + help_text="The IP address, host name or full URL. You may enter one endpoint per line. " + "Each must be valid.", + widget=forms.widgets.Textarea(attrs={"rows": "15", "cols": "400"})) + product = forms.CharField(required=True, + label=labels.ASSET_LABEL, help_text=labels.ASSET_ENDPOINT_HELP, + widget=forms.widgets.HiddenInput()) + tags = TagField(required=False, + help_text="Add tags that help describe this endpoint. " + "Choose from the list or add new tags. Press Enter key to add.") + + def __init__(self, *args, **kwargs): + product = None + if "product" in kwargs: + product = kwargs.pop("product") + super().__init__(*args, **kwargs) + self.fields["product"] = forms.ModelChoiceField( + queryset=get_authorized_products("add"), + label=labels.ASSET_LABEL, + help_text=labels.ASSET_ENDPOINT_HELP) + if product is not None: + self.fields["product"].initial = product.id + + self.product = product + self.endpoints_to_process = [] + + def save(self): + processed_endpoints = [] + for e in self.endpoints_to_process: + endpoint, _created = endpoint_get_or_create( + protocol=e[0], + userinfo=e[1], + host=e[2], + port=e[3], + path=e[4], + query=e[5], + fragment=e[6], + product=self.product, + ) + processed_endpoints.append(endpoint) + return processed_endpoints + + def clean(self): + + cleaned_data = super().clean() + + if "endpoint" in cleaned_data and "product" in cleaned_data: + endpoint = cleaned_data["endpoint"] + product = cleaned_data["product"] + if isinstance(product, Product): + self.product = product + else: + self.product = Product.objects.get(id=int(product)) + else: + msg = "Please enter a valid URL or IP address." + raise forms.ValidationError(msg, code="invalid") + + endpoints_to_add_list, errors = validate_endpoints_to_add(endpoint) + if errors: + raise forms.ValidationError(errors) + self.endpoints_to_process = endpoints_to_add_list + + return cleaned_data + + def clean_tags(self): + tag_validator(self.cleaned_data.get("tags")) + return self.cleaned_data.get("tags") + + +class DeleteEndpointForm(forms.ModelForm): + id = forms.IntegerField(required=True, + widget=forms.widgets.HiddenInput()) + + class Meta: + model = Endpoint + fields = ["id"] diff --git a/dojo/endpoint/urls.py b/dojo/endpoint/ui/urls.py similarity index 98% rename from dojo/endpoint/urls.py rename to dojo/endpoint/ui/urls.py index 94f6fbdcdb7..4b92af3e7b6 100644 --- a/dojo/endpoint/urls.py +++ b/dojo/endpoint/ui/urls.py @@ -1,6 +1,6 @@ from django.urls import re_path -from dojo.endpoint import views +from dojo.endpoint.ui import views urlpatterns = [ # endpoints diff --git a/dojo/endpoint/views.py b/dojo/endpoint/ui/views.py similarity index 99% rename from dojo/endpoint/views.py rename to dojo/endpoint/ui/views.py index 74c922f9ea7..531f21cffd5 100644 --- a/dojo/endpoint/views.py +++ b/dojo/endpoint/ui/views.py @@ -18,8 +18,8 @@ from dojo.authorization.authorization import user_has_permission_or_403 from dojo.celery_dispatch import dojo_dispatch_task from dojo.endpoint.queries import get_authorized_endpoints_for_queryset +from dojo.endpoint.ui.filters import EndpointFilter, EndpointFilterWithoutObjectLookups from dojo.endpoint.utils import clean_hosts_run, endpoint_meta_import -from dojo.filters import EndpointFilter, EndpointFilterWithoutObjectLookups from dojo.forms import ( AddEndpointForm, DeleteEndpointForm, diff --git a/dojo/filters.py b/dojo/filters.py index 43f588a4503..8162da4e0c5 100644 --- a/dojo/filters.py +++ b/dojo/filters.py @@ -10,7 +10,6 @@ from django.conf import settings from django.contrib.contenttypes.models import ContentType from django.db.models import Count, Q -from django.forms import HiddenInput from django.utils.timezone import now, tzinfo from django.utils.translation import gettext_lazy as _ from django_filters import ( @@ -30,7 +29,6 @@ # from tagulous.forms import TagWidget # import tagulous -from dojo.endpoint.queries import get_authorized_endpoints_for_queryset from dojo.engagement.queries import get_authorized_engagements from dojo.finding.helper import ( ACCEPTED_FINDINGS_QUERY, @@ -65,7 +63,6 @@ TextQuestion, Vulnerability_Id, ) -from dojo.product.queries import get_authorized_products from dojo.product_type.queries import get_authorized_product_types from dojo.utils import get_system_setting, is_finding_groups_enabled, truncate_timezone_aware @@ -1284,281 +1281,6 @@ class Meta: exclude = ["last_modified", "endpoint", "finding"] -class EndpointFilterHelper(FilterSet): - protocol = CharFilter(lookup_expr="icontains") - userinfo = CharFilter(lookup_expr="icontains") - host = CharFilter(lookup_expr="icontains") - port = NumberFilter() - path = CharFilter(lookup_expr="icontains") - query = CharFilter(lookup_expr="icontains") - fragment = CharFilter(lookup_expr="icontains") - tag = CharFilter(field_name="tags__name", lookup_expr="icontains", label="Tag name contains") - not_tag = CharFilter(field_name="tags__name", lookup_expr="icontains", label="Not tag name contains", exclude=True) - has_tags = BooleanFilter(field_name="tags", lookup_expr="isnull", exclude=True, label="Has tags") - o = OrderingFilter( - # tuple-mapping retains order - fields=( - ("product", "product"), - ("host", "host"), - ("id", "id"), - ("active_finding_count", "active_finding_count"), - ), - field_labels={ - "active_finding_count": "Active Findings Count", - }, - ) - - -class EndpointFilter(EndpointFilterHelper, DojoFilter): - product = ModelMultipleChoiceFilter( - queryset=Product.objects.none(), - label=labels.ASSET_FILTERS_LABEL) - tags = ModelMultipleChoiceFilter( - field_name="tags__name", - to_field_name="name", - label="Endpoint Tags", - queryset=Endpoint.tags.tag_model.objects.all().order_by("name")) - findings__tags = ModelMultipleChoiceFilter( - field_name="findings__tags__name", - to_field_name="name", - label="Finding Tags", - queryset=Finding.tags.tag_model.objects.all().order_by("name")) - findings__test__tags = ModelMultipleChoiceFilter( - field_name="findings__test__tags__name", - to_field_name="name", - label="Test Tags", - queryset=Test.tags.tag_model.objects.all().order_by("name")) - findings__test__engagement__tags = ModelMultipleChoiceFilter( - field_name="findings__test__engagement__tags__name", - to_field_name="name", - label="Engagement Tags", - queryset=Engagement.tags.tag_model.objects.all().order_by("name")) - findings__test__engagement__product__tags = ModelMultipleChoiceFilter( - field_name="findings__test__engagement__product__tags__name", - to_field_name="name", - label=labels.ASSET_FILTERS_TAGS_ASSET_LABEL, - queryset=Product.tags.tag_model.objects.all().order_by("name")) - not_tags = ModelMultipleChoiceFilter( - field_name="tags__name", - to_field_name="name", - label="Not Endpoint Tags", - exclude=True, - queryset=Endpoint.tags.tag_model.objects.all().order_by("name")) - not_findings__tags = ModelMultipleChoiceFilter( - field_name="findings__tags__name", - to_field_name="name", - label="Not Finding Tags", - exclude=True, - queryset=Finding.tags.tag_model.objects.all().order_by("name")) - not_findings__test__tags = ModelMultipleChoiceFilter( - field_name="findings__test__tags__name", - to_field_name="name", - label="Not Test Tags", - exclude=True, - queryset=Test.tags.tag_model.objects.all().order_by("name")) - not_findings__test__engagement__tags = ModelMultipleChoiceFilter( - field_name="findings__test__engagement__tags__name", - to_field_name="name", - label="Not Engagement Tags", - exclude=True, - queryset=Engagement.tags.tag_model.objects.all().order_by("name")) - not_findings__test__engagement__product__tags = ModelMultipleChoiceFilter( - field_name="findings__test__engagement__product__tags__name", - to_field_name="name", - label=labels.ASSET_FILTERS_NOT_TAGS_ASSET_LABEL, - exclude=True, - queryset=Product.tags.tag_model.objects.all().order_by("name")) - - def __init__(self, *args, **kwargs): - self.user = None - if "user" in kwargs: - self.user = kwargs.pop("user") - super().__init__(*args, **kwargs) - self.form.fields["product"].queryset = get_authorized_products("view") - - @property - def qs(self): - parent = super().qs - return get_authorized_endpoints_for_queryset("view", parent) - - class Meta: - model = Endpoint - exclude = ["findings", "inherited_tags"] - - -class EndpointFilterWithoutObjectLookups(EndpointFilterHelper): - product = NumberFilter(widget=HiddenInput()) - product__name = CharFilter( - field_name="product__name", - lookup_expr="iexact", - label=labels.ASSET_FILTERS_NAME_LABEL, - help_text=labels.ASSET_FILTERS_NAME_HELP) - product__name_contains = CharFilter( - field_name="product__name", - lookup_expr="icontains", - label=labels.ASSET_FILTERS_NAME_CONTAINS_LABEL, - help_text=labels.ASSET_FILTERS_NAME_CONTAINS_HELP) - - tags_contains = CharFilter( - label="Endpoint Tag Contains", - field_name="tags__name", - lookup_expr="icontains", - help_text="Search for tags on a Endpoint that contain a given pattern") - tags = CharFilter( - label="Endpoint Tag", - field_name="tags__name", - lookup_expr="iexact", - help_text="Search for tags on a Endpoint that are an exact match") - findings__tags_contains = CharFilter( - label="Finding Tag Contains", - field_name="findings__tags__name", - lookup_expr="icontains", - help_text="Search for tags on a Finding that contain a given pattern") - findings__tags = CharFilter( - label="Finding Tag", - field_name="findings__tags__name", - lookup_expr="iexact", - help_text="Search for tags on a Finding that are an exact match") - findings__test__tags_contains = CharFilter( - label="Test Tag Contains", - field_name="findings__test__tags__name", - lookup_expr="icontains", - help_text="Search for tags on a Finding that contain a given pattern") - findings__test__tags = CharFilter( - label="Test Tag", - field_name="findings__test__tags__name", - lookup_expr="iexact", - help_text="Search for tags on a Finding that are an exact match") - findings__test__engagement__tags_contains = CharFilter( - label="Engagement Tag Contains", - field_name="findings__test__engagement__tags__name", - lookup_expr="icontains", - help_text="Search for tags on a Finding that contain a given pattern") - findings__test__engagement__tags = CharFilter( - label="Engagement Tag", - field_name="findings__test__engagement__tags__name", - lookup_expr="iexact", - help_text="Search for tags on a Finding that are an exact match") - findings__test__engagement__product__tags_contains = CharFilter( - label=labels.ASSET_FILTERS_TAG_ASSET_CONTAINS_LABEL, - field_name="findings__test__engagement__product__tags__name", - lookup_expr="icontains", - help_text=labels.ASSET_FILTERS_TAG_ASSET_CONTAINS_HELP) - findings__test__engagement__product__tags = CharFilter( - label=labels.ASSET_FILTERS_TAG_ASSET_LABEL, - field_name="findings__test__engagement__product__tags__name", - lookup_expr="iexact", - help_text=labels.ASSET_FILTERS_TAG_ASSET_HELP) - - not_tags_contains = CharFilter( - label="Endpoint Tag Does Not Contain", - field_name="tags__name", - lookup_expr="icontains", - help_text="Search for tags on a Endpoint that contain a given pattern, and exclude them", - exclude=True) - not_tags = CharFilter( - label="Not Endpoint Tag", - field_name="tags__name", - lookup_expr="iexact", - help_text="Search for tags on a Endpoint that are an exact match, and exclude them", - exclude=True) - not_findings__tags_contains = CharFilter( - label="Finding Tag Does Not Contain", - field_name="findings__tags__name", - lookup_expr="icontains", - help_text="Search for tags on a Finding that contain a given pattern, and exclude them", - exclude=True) - not_findings__tags = CharFilter( - label="Not Finding Tag", - field_name="findings__tags__name", - lookup_expr="iexact", - help_text="Search for tags on a Finding that are an exact match, and exclude them", - exclude=True) - not_findings__test__tags_contains = CharFilter( - label="Test Tag Does Not Contain", - field_name="findings__test__tags__name", - lookup_expr="icontains", - help_text="Search for tags on a Test that contain a given pattern, and exclude them", - exclude=True) - not_findings__test__tags = CharFilter( - label="Not Test Tag", - field_name="findings__test__tags__name", - lookup_expr="iexact", - help_text="Search for tags on a Test that are an exact match, and exclude them", - exclude=True) - not_findings__test__engagement__tags_contains = CharFilter( - label="Engagement Tag Does Not Contain", - field_name="findings__test__engagement__tags__name", - lookup_expr="icontains", - help_text="Search for tags on a Engagement that contain a given pattern, and exclude them", - exclude=True) - not_findings__test__engagement__tags = CharFilter( - label="Not Engagement Tag", - field_name="findings__test__engagement__tags__name", - lookup_expr="iexact", - help_text="Search for tags on a Engagement that are an exact match, and exclude them", - exclude=True) - not_findings__test__engagement__product__tags_contains = CharFilter( - label=labels.ASSET_FILTERS_TAG_NOT_CONTAIN_LABEL, - field_name="findings__test__engagement__product__tags__name", - lookup_expr="icontains", - help_text=labels.ASSET_FILTERS_TAG_NOT_CONTAIN_HELP, - exclude=True) - not_findings__test__engagement__product__tags = CharFilter( - label=labels.ASSET_FILTERS_TAG_NOT_LABEL, - field_name="findings__test__engagement__product__tags__name", - lookup_expr="iexact", - help_text=labels.ASSET_FILTERS_TAG_NOT_HELP, - exclude=True) - - def __init__(self, *args, **kwargs): - self.user = None - if "user" in kwargs: - self.user = kwargs.pop("user") - super().__init__(*args, **kwargs) - - @property - def qs(self): - parent = super().qs - return get_authorized_endpoints_for_queryset("view", parent) - - class Meta: - model = Endpoint - exclude = ["findings", "inherited_tags", "product"] - - -class ApiEndpointFilter(DojoFilter): - tag = CharFilter(field_name="tags__name", lookup_expr="icontains", help_text="Tag name contains") - tags = CharFieldInFilter( - field_name="tags__name", - lookup_expr="in", - help_text="Comma separated list of exact tags (uses OR for multiple values)") - tags__and = CharFieldFilterANDExpression( - field_name="tags__name", - help_text="Comma separated list of exact tags to match with an AND expression") - not_tag = CharFilter(field_name="tags__name", lookup_expr="icontains", help_text="Not Tag name contains", exclude="True") - not_tags = CharFieldInFilter(field_name="tags__name", lookup_expr="in", - help_text="Comma separated list of exact tags not present on model", exclude="True") - has_tags = BooleanFilter(field_name="tags", lookup_expr="isnull", exclude=True, label="Has tags") - - o = OrderingFilter( - # tuple-mapping retains order - fields=( - ("host", "host"), - ("product", "product"), - ("id", "id"), - ("active_finding_count", "active_finding_count"), - ), - field_labels={ - "active_finding_count": "Active Findings Count", - }, - ) - - class Meta: - model = Endpoint - fields = ["id", "protocol", "userinfo", "host", "port", "path", "query", "fragment", "product"] - - class ApiRiskAcceptanceFilter(DojoFilter): created = DateRangeFilter() updated = DateRangeFilter() diff --git a/dojo/forms.py b/dojo/forms.py index 57ce9e0f53b..e07dfb9517c 100644 --- a/dojo/forms.py +++ b/dojo/forms.py @@ -15,7 +15,6 @@ from django.contrib.auth.models import Permission from django.contrib.auth.password_validation import validate_password from django.core.exceptions import ValidationError -from django.core.validators import URLValidator from django.db.models import Count from django.forms import modelformset_factory from django.forms.widgets import Select, Widget @@ -26,7 +25,7 @@ from polymorphic.base import ManagerInheritanceWarning from tagulous.forms import TagField -from dojo.endpoint.utils import endpoint_filter, endpoint_get_or_create, validate_endpoints_to_add +from dojo.endpoint.utils import validate_endpoints_to_add from dojo.finding.queries import get_authorized_findings from dojo.github.ui.forms import ( # noqa: F401 -- backward compat DeleteGITHUBConfForm, @@ -76,7 +75,6 @@ Note_Type, Notes, Objects_Product, - Product, Product_API_Scan_Configuration, Product_Type, Question, @@ -86,12 +84,8 @@ Test_Type, TextAnswer, TextQuestion, - Tool_Configuration, - Tool_Product_Settings, - Tool_Type, User, ) -from dojo.product.queries import get_authorized_products from dojo.product_type.queries import get_authorized_product_types from dojo.tools.factory import get_choices_sorted, requires_file, requires_tool_type from dojo.user.queries import get_authorized_users @@ -576,29 +570,12 @@ def clean_scan_date(self): return date -class ImportEndpointMetaForm(forms.Form): - file = forms.FileField(widget=forms.widgets.FileInput( - attrs={"accept": ".csv"}), - label="Choose meta file", - required=True) # Could not get required=True to actually accept the file as present - create_endpoints = forms.BooleanField( - label="Create nonexisting Endpoint", - initial=True, - required=False, - help_text="Create endpoints that do not already exist") - create_tags = forms.BooleanField( - label="Add Tags", - initial=True, - required=False, - help_text="Add meta from file as tags in the format key:value") - create_dojo_meta = forms.BooleanField( - label="Add Meta", - initial=False, - required=False, - help_text="Add data from file as Metadata. Metadata is used for displaying custom fields") - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) +from dojo.endpoint.ui.forms import ( # noqa: E402, F401 -- backward compat re-export + AddEndpointForm, + DeleteEndpointForm, + EditEndpointForm, + ImportEndpointMetaForm, +) class DoneForm(forms.Form): @@ -769,134 +746,6 @@ class Meta: from dojo.test.ui.forms import TestForm # noqa: E402, F401 -- backward compat -class EditEndpointForm(forms.ModelForm): - class Meta: - model = Endpoint - exclude = ["product", "inherited_tags"] - - def __init__(self, *args, **kwargs): - self.product = None - self.endpoint_instance = None - super().__init__(*args, **kwargs) - if "instance" in kwargs: - self.endpoint_instance = kwargs.pop("instance") - self.product = self.endpoint_instance.product - product_id = self.endpoint_instance.product.pk - findings = Finding.objects.filter(test__engagement__product__id=product_id) - self.fields["findings"].queryset = findings - - def clean(self): - - cleaned_data = super().clean() - - protocol = cleaned_data["protocol"] - userinfo = cleaned_data["userinfo"] - host = cleaned_data["host"] - port = cleaned_data["port"] - path = cleaned_data["path"] - query = cleaned_data["query"] - fragment = cleaned_data["fragment"] - - endpoint = endpoint_filter( - protocol=protocol, - userinfo=userinfo, - host=host, - port=port, - path=path, - query=query, - fragment=fragment, - product=self.product, - ) - if endpoint.count() > 1 or (endpoint.count() == 1 and endpoint.first().pk != self.endpoint_instance.pk): - msg = "It appears as though an endpoint with this data already exists for this product." - raise forms.ValidationError(msg, code="invalid") - - return cleaned_data - - def clean_tags(self): - tag_validator(self.cleaned_data.get("tags")) - return self.cleaned_data.get("tags") - - -class AddEndpointForm(forms.Form): - endpoint = forms.CharField(max_length=5000, required=True, label="Endpoint(s)", - help_text="The IP address, host name or full URL. You may enter one endpoint per line. " - "Each must be valid.", - widget=forms.widgets.Textarea(attrs={"rows": "15", "cols": "400"})) - product = forms.CharField(required=True, - label=labels.ASSET_LABEL, help_text=labels.ASSET_ENDPOINT_HELP, - widget=forms.widgets.HiddenInput()) - tags = TagField(required=False, - help_text="Add tags that help describe this endpoint. " - "Choose from the list or add new tags. Press Enter key to add.") - - def __init__(self, *args, **kwargs): - product = None - if "product" in kwargs: - product = kwargs.pop("product") - super().__init__(*args, **kwargs) - self.fields["product"] = forms.ModelChoiceField( - queryset=get_authorized_products("add"), - label=labels.ASSET_LABEL, - help_text=labels.ASSET_ENDPOINT_HELP) - if product is not None: - self.fields["product"].initial = product.id - - self.product = product - self.endpoints_to_process = [] - - def save(self): - processed_endpoints = [] - for e in self.endpoints_to_process: - endpoint, _created = endpoint_get_or_create( - protocol=e[0], - userinfo=e[1], - host=e[2], - port=e[3], - path=e[4], - query=e[5], - fragment=e[6], - product=self.product, - ) - processed_endpoints.append(endpoint) - return processed_endpoints - - def clean(self): - - cleaned_data = super().clean() - - if "endpoint" in cleaned_data and "product" in cleaned_data: - endpoint = cleaned_data["endpoint"] - product = cleaned_data["product"] - if isinstance(product, Product): - self.product = product - else: - self.product = Product.objects.get(id=int(product)) - else: - msg = "Please enter a valid URL or IP address." - raise forms.ValidationError(msg, code="invalid") - - endpoints_to_add_list, errors = validate_endpoints_to_add(endpoint) - if errors: - raise forms.ValidationError(errors) - self.endpoints_to_process = endpoints_to_add_list - - return cleaned_data - - def clean_tags(self): - tag_validator(self.cleaned_data.get("tags")) - return self.cleaned_data.get("tags") - - -class DeleteEndpointForm(forms.ModelForm): - id = forms.IntegerField(required=True, - widget=forms.widgets.HiddenInput()) - - class Meta: - model = Endpoint - fields = ["id"] - - class NoteForm(forms.ModelForm): entry = forms.CharField(max_length=2400, widget=forms.Textarea(attrs={"rows": 4, "cols": 15}), label="Notes:") @@ -1120,30 +969,6 @@ class Meta: fields = ["id"] -class ToolTypeForm(forms.ModelForm): - class Meta: - model = Tool_Type - exclude = ["product"] - - def __init__(self, *args, **kwargs): - instance = kwargs.get("instance") - self.newly_created = True - if instance is not None: - self.newly_created = instance.pk is None - super().__init__(*args, **kwargs) - - def clean(self): - form_data = self.cleaned_data - if self.newly_created: - name = form_data.get("name") - # Make sure this will not create a duplicate test type - if Tool_Type.objects.filter(name=name).count() > 0: - msg = "A Tool Type with the name already exists" - raise forms.ValidationError(msg) - - return form_data - - class RegulationForm(forms.ModelForm): class Meta: model = Regulation @@ -1174,28 +999,6 @@ def __init__(self, *args, **kwargs): self.fields["website_found"].disabled = True -class ToolConfigForm(forms.ModelForm): - tool_type = forms.ModelChoiceField(queryset=Tool_Type.objects.all(), label="Tool Type") - ssh = forms.CharField(widget=forms.Textarea(attrs={}), required=False, label="SSH Key") - - class Meta: - model = Tool_Configuration - exclude = ["product"] - - def clean(self): - form_data = self.cleaned_data - - try: - if form_data["url"] is not None: - url_validator = URLValidator(schemes=["ssh", "http", "https"]) - url_validator(form_data["url"]) - except forms.ValidationError: - msg = "It does not appear as though this endpoint is a valid URL/SSH or IP address." - raise forms.ValidationError(msg, code="invalid") - - return form_data - - class SLAConfigForm(forms.ModelForm): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) @@ -1244,38 +1047,6 @@ class Meta: fields = ["id"] -class DeleteToolProductSettingsForm(forms.ModelForm): - id = forms.IntegerField(required=True, - widget=forms.widgets.HiddenInput()) - - class Meta: - model = Tool_Product_Settings - fields = ["id"] - - -class ToolProductSettingsForm(forms.ModelForm): - tool_configuration = forms.ModelChoiceField(queryset=Tool_Configuration.objects.all(), label="Tool Configuration") - - class Meta: - model = Tool_Product_Settings - fields = ["name", "description", "url", "tool_configuration", "tool_project_id"] - exclude = ["tool_type"] - order = ["name"] - - def clean(self): - form_data = self.cleaned_data - - try: - if form_data["url"] is not None: - url_validator = URLValidator(schemes=["ssh", "http", "https"]) - url_validator(form_data["url"]) - except forms.ValidationError: - msg = "It does not appear as though this endpoint is a valid URL/SSH or IP address." - raise forms.ValidationError(msg, code="invalid") - - return form_data - - class ObjectSettingsForm(forms.ModelForm): # tags = forms.CharField(widget=forms.SelectMultiple(choices=[]), diff --git a/dojo/models.py b/dojo/models.py index bec3b2ec4f9..7e6052ffe0c 100644 --- a/dojo/models.py +++ b/dojo/models.py @@ -1,24 +1,18 @@ -import contextlib import copy import logging -import re import warnings from datetime import timedelta from pathlib import Path -from urllib.parse import urlparse from uuid import uuid4 -import hyperlink import tagulous.admin -from django import forms from django.conf import settings from django.contrib import admin from django.contrib.auth import get_user_model from django.core.exceptions import ValidationError from django.core.files.base import ContentFile -from django.core.validators import validate_ipv46_address -from django.db import connection, models -from django.db.models import Count, F, Q +from django.db import models +from django.db.models import Count from django.db.models.expressions import Case, When from django.db.models.functions import Lower from django.urls import reverse @@ -507,77 +501,8 @@ def get_summary(self): return f"{self.name} - Critical: {self.critical}, High: {self.high}, Medium: {self.medium}, Low: {self.low}" -class Tool_Type(models.Model): - name = models.CharField(max_length=200) - description = models.CharField(max_length=2000, null=True, blank=True) - - class Meta: - ordering = ["name"] - - def __str__(self): - return self.name - - -class Tool_Configuration(models.Model): - name = models.CharField(max_length=200, null=False) - description = models.CharField(max_length=2000, null=True, blank=True) - url = models.CharField(max_length=2000, null=True, blank=True) - tool_type = models.ForeignKey(Tool_Type, related_name="tool_type", on_delete=models.CASCADE) - authentication_type = models.CharField(max_length=15, - choices=( - ("API", "API Key"), - ("Password", - "Username/Password"), - ("SSH", "SSH")), - null=True, blank=True) - extras = models.CharField(max_length=255, null=True, blank=True, help_text=_("Additional definitions that will be " - "consumed by scanner")) - username = models.CharField(max_length=200, null=True, blank=True) - password = models.CharField(max_length=600, null=True, blank=True) - auth_title = models.CharField(max_length=200, null=True, blank=True, - verbose_name=_("Title for SSH/API Key")) - ssh = models.CharField(max_length=6000, null=True, blank=True) - api_key = models.CharField(max_length=600, null=True, blank=True, - verbose_name=_("API Key")) - - class Meta: - ordering = ["name"] - - def __str__(self): - return self.name - - -# declare form here as we can't import forms.py due to circular imports not even locally -class ToolConfigForm_Admin(forms.ModelForm): - password = forms.CharField(widget=forms.PasswordInput, required=False) - api_key = forms.CharField(widget=forms.PasswordInput, required=False) - ssh = forms.CharField(widget=forms.PasswordInput, required=False) - - # django doesn't seem to have an easy way to handle password fields as PasswordInput requires reentry of passwords - password_from_db = None - ssh_from_db = None - api_key_from_db = None - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - if self.instance: - # keep password from db to use if the user entered no password - self.password_from_db = self.instance.password - self.ssh_from_db = self.instance.ssh - self.api_key = self.instance.api_key - - def clean(self): - cleaned_data = super().clean() - if not cleaned_data["password"] and not cleaned_data["ssh"] and not cleaned_data["api_key"]: - cleaned_data["password"] = self.password_from_db - cleaned_data["ssh"] = self.ssh_from_db - cleaned_data["api_key"] = self.api_key_from_db - - return cleaned_data - - -class Tool_Configuration_Admin(admin.ModelAdmin): - form = ToolConfigForm_Admin +from dojo.tool_config.models import Tool_Configuration # noqa: E402, F401 -- re-export +from dojo.tool_type.models import Tool_Type # noqa: E402, F401 -- re-export class Network_Locations(models.Model): @@ -587,6 +512,7 @@ def __str__(self): return self.location +from dojo.endpoint.models import Endpoint, Endpoint_Params, Endpoint_Status # noqa: E402, F401 -- re-export from dojo.engagement.models import ( # noqa: E402 -- re-export; class-body FKs below reference these ENGAGEMENT_STATUS_CHOICES, # noqa: F401 -- re-export Engagement, @@ -594,445 +520,6 @@ def __str__(self): ) -class Endpoint_Params(models.Model): - param = models.CharField(max_length=150) - value = models.CharField(max_length=150) - method_type = (("GET", "GET"), - ("POST", "POST")) - method = models.CharField(max_length=20, blank=False, null=True, choices=method_type) - - -class Endpoint_Status(models.Model): - date = models.DateField(default=get_current_date) - last_modified = models.DateTimeField(null=True, editable=False, default=get_current_datetime) - mitigated = models.BooleanField(default=False, blank=True) - mitigated_time = models.DateTimeField(editable=False, null=True, blank=True) - mitigated_by = models.ForeignKey(Dojo_User, editable=True, null=True, on_delete=models.RESTRICT) - false_positive = models.BooleanField(default=False, blank=True) - out_of_scope = models.BooleanField(default=False, blank=True) - risk_accepted = models.BooleanField(default=False, blank=True) - endpoint = models.ForeignKey("Endpoint", null=False, blank=False, on_delete=models.CASCADE, related_name="status_endpoint") - finding = models.ForeignKey("Finding", null=False, blank=False, on_delete=models.CASCADE, related_name="status_finding") - - class Meta: - indexes = [ - models.Index(fields=["finding", "mitigated"]), - models.Index(fields=["endpoint", "mitigated"]), - # Optimize frequent lookups of "active" statuses (mitigated/flags all False) - models.Index( - name="idx_eps_active_by_endpoint", - fields=["endpoint"], - condition=Q(mitigated=False, false_positive=False, out_of_scope=False, risk_accepted=False), - ), - models.Index( - name="idx_eps_active_by_finding", - fields=["finding"], - condition=Q(mitigated=False, false_positive=False, out_of_scope=False, risk_accepted=False), - ), - ] - constraints = [ - models.UniqueConstraint(fields=["finding", "endpoint"], name="endpoint-finding relation"), - ] - - def __str__(self): - with Endpoint.allow_endpoint_init(): # TODO: Delete this after the move to Locations - return f"'{self.finding}' on '{self.endpoint}'" - - def copy(self, finding=None): - copy = copy_model_util(self) - current_endpoint = self.endpoint - if finding: - copy.finding = finding - copy.endpoint = current_endpoint - copy.save() - - return copy - - @property - def age(self): - - diff = self.mitigated_time.date() - self.date if self.mitigated else get_current_date() - self.date - days = diff.days - return max(0, days) - - -class Endpoint(models.Model): - protocol = models.CharField(null=True, blank=True, max_length=20, - help_text=_("The communication protocol/scheme such as 'http', 'ftp', 'dns', etc.")) - userinfo = models.CharField(null=True, blank=True, max_length=500, - help_text=_("User info as 'alice', 'bob', etc.")) - host = models.CharField(null=True, blank=True, max_length=500, - help_text=_("The host name or IP address. It must not include the port number. " - "For example '127.0.0.1', 'localhost', 'yourdomain.com'.")) - port = models.IntegerField(null=True, blank=True, - help_text=_("The network port associated with the endpoint.")) - path = models.CharField(null=True, blank=True, max_length=500, - help_text=_("The location of the resource, it must not start with a '/'. For example " - "endpoint/420/edit")) - query = models.CharField(null=True, blank=True, max_length=1000, - help_text=_("The query string, the question mark should be omitted." - "For example 'group=4&team=8'")) - fragment = models.CharField(null=True, blank=True, max_length=500, - help_text=_("The fragment identifier which follows the hash mark. The hash mark should " - "be omitted. For example 'section-13', 'paragraph-2'.")) - product = models.ForeignKey(Product, null=True, blank=True, on_delete=models.CASCADE) - endpoint_params = models.ManyToManyField(Endpoint_Params, blank=True, editable=False) - findings = models.ManyToManyField("Finding", - blank=True, - verbose_name=_("Findings"), - through=Endpoint_Status) - - tags = TagField(blank=True, force_lowercase=True, help_text=_("Add tags that help describe this endpoint. Choose from the list or add new tags. Press Enter key to add.")) - inherited_tags = TagField(blank=True, force_lowercase=True, help_text=_("Internal use tags sepcifically for maintaining parity with product. This field will be present as a subset in the tags field")) - - class Meta: - ordering = ["product", "host", "protocol", "port", "userinfo", "path", "query", "fragment"] - indexes = [ - models.Index(fields=["product"]), - # Fast case-insensitive equality on host within product scope - models.Index( - F("product"), - Lower("host"), - name="idx_ep_product_lower_host", - ), - ] - - def __init__(self, *args, **kwargs): - if settings.V3_FEATURE_LOCATIONS and not getattr(self, "_allow_v3_init", False): - msg = "Endpoint model is deprecated when V3_FEATURE_LOCATIONS is enabled" - raise NotImplementedError(msg) - super().__init__(*args, **kwargs) - - def __hash__(self): - return self.__str__().__hash__() - - def __eq__(self, other): - if isinstance(other, Endpoint): - contents_match = str(self) == str(other) - # Use product_id (cached integer) instead of self.product to avoid - # triggering a FK lookup on every comparison inside NestedObjects.add_edge. - if self.product_id is not None and other.product_id is not None: - return self.product_id == other.product_id and contents_match - return contents_match - - return NotImplemented - - def __str__(self): - try: - if self.host: - dummy_scheme = "dummy-scheme" # workaround for https://github.com/python-hyper/hyperlink/blob/b8c9152cd826bbe8e6cc125648f3738235019705/src/hyperlink/_url.py#L988 - url = hyperlink.EncodedURL( - scheme=self.protocol or dummy_scheme, - userinfo=self.userinfo or "", - host=self.host, - port=self.port, - path=tuple(self.path.split("/")) if self.path else (), - query=tuple( - ( - qe.split("=", 1) - if "=" in qe - else (qe, None) - ) - for qe in self.query.split("&") - ) if self.query else (), # inspired by https://github.com/python-hyper/hyperlink/blob/b8c9152cd826bbe8e6cc125648f3738235019705/src/hyperlink/_url.py#L1427 - fragment=self.fragment or "", - ) - # Return a normalized version of the URL to avoid differences where there shouldn't be any difference. - # Example: https://google.com and https://google.com:443 - normalize_path = self.path # it used to add '/' at the end of host - clean_url = url.normalize(scheme=True, host=True, path=normalize_path, query=True, fragment=True, userinfo=True, percents=True).to_uri().to_text() - if not self.protocol: - if clean_url[:len(dummy_scheme) + 3] == (dummy_scheme + "://"): - clean_url = clean_url[len(dummy_scheme) + 3:] - else: - msg = "hyperlink lib did not create URL as was expected" - raise ValueError(msg) - return clean_url - msg = "Missing host" - raise ValueError(msg) - except: - url = "" - if self.protocol: - url += f"{self.protocol}://" - if self.userinfo: - url += f"{self.userinfo}@" - if self.host: - url += self.host - if self.port: - url += f":{self.port}" - if self.path: - url += "{}{}".format("/" if self.path[0] != "/" else "", self.path) - if self.query: - url += f"?{self.query}" - if self.fragment: - url += f"#{self.fragment}" - return url - - def get_absolute_url(self): - return reverse("view_endpoint", args=[str(self.id)]) - - @classmethod - @contextlib.contextmanager - def allow_endpoint_init(cls): - # When migrating to Locations, Endpoints are not deleted (hooray backup!). Disallowing the initialization of - # Endpoints is a good way to catch where they might still be used (oops!). However, there are some circumstances - # -- object deletes -- where Django itself attempts to instantiate an Endpoint object. This, we need to allow: - # if a user wants to delete an object, including whatever Endpoints are attached to it, they should be able to. - # This context manager allows code to initialize Endpoints at our discretion. - old = getattr(cls, "_allow_v3_init", None) - cls._allow_v3_init = True - try: - yield - finally: - cls._allow_v3_init = old - - def clean(self): - errors = [] - null_char_list = ["0x00", "\x00"] - db_type = connection.vendor - if self.protocol is not None: - if not re.match(r"^[A-Za-z][A-Za-z0-9\.\-\+]+$", self.protocol): # https://tools.ietf.org/html/rfc3986#section-3.1 - errors.append(ValidationError(f'Protocol "{self.protocol}" has invalid format')) - if not self.protocol: - self.protocol = None - - if self.userinfo is not None: - if not re.match(r"^[A-Za-z0-9\.\-_~%\!\$&\'\(\)\*\+,;=:]+$", self.userinfo): # https://tools.ietf.org/html/rfc3986#section-3.2.1 - errors.append(ValidationError(f'Userinfo "{self.userinfo}" has invalid format')) - if not self.userinfo: - self.userinfo = None - - if self.host: - if not re.match(r"^[A-Za-z0-9_\-\+][A-Za-z0-9_\.\-\+]+$", self.host): - try: - validate_ipv46_address(self.host) - except ValidationError: - errors.append(ValidationError(f'Host "{self.host}" has invalid format')) - else: - errors.append(ValidationError("Host must not be empty")) - - if self.port is not None: - try: - int_port = int(self.port) - if not (0 <= int_port < 65536): - errors.append(ValidationError(f'Port "{self.port}" has invalid format - out of range')) - self.port = int_port - except ValueError: - errors.append(ValidationError(f'Port "{self.port}" has invalid format - it is not a number')) - - if self.path is not None: - while len(self.path) > 0 and self.path[0] == "/": # Endpoint store "root-less" path - self.path = self.path[1:] - if any(null_char in self.path for null_char in null_char_list): - old_value = self.path - if "postgres" in db_type: - action_string = "Postgres does not accept NULL character. Attempting to replace with %00..." - for remove_str in null_char_list: - self.path = self.path.replace(remove_str, "%00") - logger.error('Path "%s" has invalid format - It contains the NULL character. The following action was taken: %s', old_value, action_string) - if not self.path: - self.path = None - - if self.query is not None: - if len(self.query) > 0 and self.query[0] == "?": - self.query = self.query[1:] - if any(null_char in self.query for null_char in null_char_list): - old_value = self.query - if "postgres" in db_type: - action_string = "Postgres does not accept NULL character. Attempting to replace with %00..." - for remove_str in null_char_list: - self.query = self.query.replace(remove_str, "%00") - logger.error('Query "%s" has invalid format - It contains the NULL character. The following action was taken: %s', old_value, action_string) - if not self.query: - self.query = None - - if self.fragment is not None: - if len(self.fragment) > 0 and self.fragment[0] == "#": - self.fragment = self.fragment[1:] - if any(null_char in self.fragment for null_char in null_char_list): - old_value = self.fragment - if "postgres" in db_type: - action_string = "Postgres does not accept NULL character. Attempting to replace with %00..." - for remove_str in null_char_list: - self.fragment = self.fragment.replace(remove_str, "%00") - logger.error('Fragment "%s" has invalid format - It contains the NULL character. The following action was taken: %s', old_value, action_string) - if not self.fragment: - self.fragment = None - - if errors: - raise ValidationError(errors) - - @property - def is_broken(self): - try: - self.clean() - except: - return True - else: - return not self.product - - @property - def mitigated(self): - return not self.vulnerable - - @property - def vulnerable(self): - return Endpoint_Status.objects.filter( - endpoint=self, - mitigated=False, - false_positive=False, - out_of_scope=False, - risk_accepted=False, - ).count() > 0 - - @property - def findings_count(self): - return self.findings.all().count() - - def active_findings(self): - return self.findings.filter( - active=True, - out_of_scope=False, - mitigated__isnull=True, - false_p=False, - duplicate=False, - status_finding__false_positive=False, - status_finding__out_of_scope=False, - status_finding__risk_accepted=False, - ).order_by("numerical_severity") - - def active_verified_findings(self): - return self.findings.filter( - active=True, - verified=True, - out_of_scope=False, - mitigated__isnull=True, - false_p=False, - duplicate=False, - status_finding__false_positive=False, - status_finding__out_of_scope=False, - status_finding__risk_accepted=False, - ).order_by("numerical_severity") - - @property - def active_findings_count(self): - return self.active_findings().count() - - @property - def active_verified_findings_count(self): - return self.active_verified_findings().count() - - def host_endpoints(self): - return Endpoint.objects.filter(host=self.host, - product=self.product).distinct() - - @property - def host_endpoints_count(self): - return self.host_endpoints().count() - - def host_mitigated_endpoints(self): - meps = Endpoint_Status.objects \ - .filter(endpoint__in=self.host_endpoints()) \ - .filter(Q(mitigated=True) - | Q(false_positive=True) - | Q(out_of_scope=True) - | Q(risk_accepted=True) - | Q(finding__out_of_scope=True) - | Q(finding__mitigated__isnull=False) - | Q(finding__false_p=True) - | Q(finding__duplicate=True) - | Q(finding__active=False)) - return Endpoint.objects.filter(status_endpoint__in=meps).distinct() - - @property - def host_mitigated_endpoints_count(self): - return self.host_mitigated_endpoints().count() - - def host_findings(self): - return Finding.objects.filter(endpoints__in=self.host_endpoints()).distinct() - - @property - def host_findings_count(self): - return self.host_findings().count() - - def host_active_findings(self): - return Finding.objects.filter( - active=True, - out_of_scope=False, - mitigated__isnull=True, - false_p=False, - duplicate=False, - status_finding__false_positive=False, - status_finding__out_of_scope=False, - status_finding__risk_accepted=False, - endpoints__in=self.host_endpoints(), - ).order_by("numerical_severity") - - def host_active_verified_findings(self): - return Finding.objects.filter( - active=True, - verified=True, - out_of_scope=False, - mitigated__isnull=True, - false_p=False, - duplicate=False, - status_finding__false_positive=False, - status_finding__out_of_scope=False, - status_finding__risk_accepted=False, - endpoints__in=self.host_endpoints(), - ).order_by("numerical_severity") - - @property - def host_active_findings_count(self): - return self.host_active_findings().count() - - @property - def host_active_verified_findings_count(self): - return self.host_active_verified_findings().count() - - def get_breadcrumbs(self): - bc = self.product.get_breadcrumbs() - bc += [{"title": self.host, - "url": reverse("view_endpoint", args=(self.id,))}] - return bc - - @staticmethod - def from_uri(uri): - try: - url = hyperlink.parse(url=uri) - except UnicodeDecodeError: - url = hyperlink.parse(url="//" + urlparse(uri).netloc) - except hyperlink.URLParseError as e: - msg = f"Invalid URL format: {e}" - raise ValidationError(msg) - - query_parts = [] # inspired by https://github.com/python-hyper/hyperlink/blob/b8c9152cd826bbe8e6cc125648f3738235019705/src/hyperlink/_url.py#L1768 - for k, v in url.query: - if v is None: - query_parts.append(k) - else: - query_parts.append(f"{k}={v}") - query_string = "&".join(query_parts) - - protocol = url.scheme or None - userinfo = ":".join(url.userinfo) if url.userinfo not in {(), ("",)} else None - host = url.host or None - port = url.port - path = "/".join(url.path)[:500] if url.path not in {None, (), ("",)} else None - query = query_string[:1000] if query_string is not None and query_string else None - fragment = url.fragment[:500] if url.fragment is not None and url.fragment else None - - return Endpoint( - protocol=protocol, - userinfo=userinfo, - host=host, - port=port, - path=path, - query=query, - fragment=fragment, - ) - - class Development_Environment(models.Model): name = models.CharField(max_length=200) @@ -1312,28 +799,7 @@ class BannerConf(models.Model): Notification_Webhooks, Notifications, ) - - -class Tool_Product_Settings(models.Model): - name = models.CharField(max_length=200, null=False) - description = models.CharField(max_length=2000, null=True, blank=True) - url = models.CharField(max_length=2000, null=True, blank=True) - product = models.ForeignKey(Product, default=1, editable=False, on_delete=models.CASCADE) - tool_configuration = models.ForeignKey(Tool_Configuration, null=False, - related_name="tool_configuration", on_delete=models.CASCADE) - tool_project_id = models.CharField(max_length=200, null=True, blank=True) - notes = models.ManyToManyField(Notes, blank=True, editable=False) - - class Meta: - ordering = ["name"] - - -class Tool_Product_History(models.Model): - product = models.ForeignKey(Tool_Product_Settings, editable=False, on_delete=models.CASCADE) - last_scan = models.DateTimeField(null=False, editable=False, default=now) - succesfull = models.BooleanField(default=True, verbose_name=_("Succesfully")) - configuration_details = models.CharField(max_length=2000, null=True, - blank=True) +from dojo.tool_product.models import Tool_Product_History, Tool_Product_Settings # noqa: E402, F401 -- re-export class Language_Type(models.Model): @@ -1728,8 +1194,6 @@ def __str__(self): tagulous.admin.register(Finding.inherited_tags) tagulous.admin.register(Engagement.tags) tagulous.admin.register(Engagement.inherited_tags) -tagulous.admin.register(Endpoint.tags) -tagulous.admin.register(Endpoint.inherited_tags) tagulous.admin.register(Finding_Template.tags) tagulous.admin.register(App_Analysis.tags) tagulous.admin.register(Objects_Product.tags) @@ -1755,15 +1219,8 @@ def __str__(self): admin.site.register(FileAccessToken) admin.site.register(Risk_Acceptance) admin.site.register(Check_List) -admin.site.register(Endpoint_Params) -admin.site.register(Endpoint_Status) -admin.site.register(Endpoint) admin.site.register(Notes) admin.site.register(Note_Type) -admin.site.register(Tool_Configuration, Tool_Configuration_Admin) -admin.site.register(Tool_Product_Settings) -admin.site.register(Tool_Type) - admin.site.register(SLA_Configuration) admin.site.register(Regulation) from dojo.authorization.models import ( # noqa: E402 @@ -1798,5 +1255,4 @@ def __str__(self): admin.site.register(Announcement) admin.site.register(UserAnnouncement) admin.site.register(BannerConf) -admin.site.register(Tool_Product_History) admin.site.register(General_Survey) diff --git a/dojo/reports/views.py b/dojo/reports/views.py index ff9049738e9..7372e9a0287 100644 --- a/dojo/reports/views.py +++ b/dojo/reports/views.py @@ -17,9 +17,8 @@ from dojo.authorization.authorization import user_has_permission_or_403 from dojo.authorization.roles_permissions import Permissions from dojo.endpoint.queries import get_authorized_endpoints +from dojo.endpoint.ui.filters import EndpointFilter, EndpointFilterWithoutObjectLookups from dojo.filters import ( - EndpointFilter, - EndpointFilterWithoutObjectLookups, EndpointReportFilter, ) from dojo.finding.queries import get_authorized_findings diff --git a/dojo/reports/widgets.py b/dojo/reports/widgets.py index 2620ecf23d2..07377560539 100644 --- a/dojo/reports/widgets.py +++ b/dojo/reports/widgets.py @@ -12,10 +12,7 @@ from django.utils.html import format_html from django.utils.safestring import mark_safe -from dojo.filters import ( - EndpointFilter, - EndpointFilterWithoutObjectLookups, -) +from dojo.endpoint.ui.filters import EndpointFilter, EndpointFilterWithoutObjectLookups from dojo.finding.ui.filters import ( ReportFindingFilter, ReportFindingFilterWithoutObjectLookups, diff --git a/dojo/search/views.py b/dojo/search/views.py index 13dd70e1ed1..050286c5999 100644 --- a/dojo/search/views.py +++ b/dojo/search/views.py @@ -10,7 +10,7 @@ from watson import search as watson from dojo.endpoint.queries import get_authorized_endpoints -from dojo.endpoint.views import prefetch_for_endpoints +from dojo.endpoint.ui.views import prefetch_for_endpoints from dojo.engagement.queries import get_authorized_engagements from dojo.finding.queries import get_authorized_findings, get_authorized_vulnerability_ids, prefetch_for_findings from dojo.finding.ui.filters import FindingFilter, FindingFilterWithoutObjectLookups diff --git a/dojo/tool_config/__init__.py b/dojo/tool_config/__init__.py index e69de29bb2d..3f63d29bec4 100644 --- a/dojo/tool_config/__init__.py +++ b/dojo/tool_config/__init__.py @@ -0,0 +1 @@ +import dojo.tool_config.admin # noqa: F401 diff --git a/dojo/tool_config/admin.py b/dojo/tool_config/admin.py new file mode 100644 index 00000000000..cb07719c546 --- /dev/null +++ b/dojo/tool_config/admin.py @@ -0,0 +1,40 @@ +from django import forms +from django.contrib import admin + +from dojo.tool_config.models import Tool_Configuration + + +# declare form here as we can't import forms.py due to circular imports not even locally +class ToolConfigForm_Admin(forms.ModelForm): + password = forms.CharField(widget=forms.PasswordInput, required=False) + api_key = forms.CharField(widget=forms.PasswordInput, required=False) + ssh = forms.CharField(widget=forms.PasswordInput, required=False) + + # django doesn't seem to have an easy way to handle password fields as PasswordInput requires reentry of passwords + password_from_db = None + ssh_from_db = None + api_key_from_db = None + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + if self.instance: + # keep password from db to use if the user entered no password + self.password_from_db = self.instance.password + self.ssh_from_db = self.instance.ssh + self.api_key = self.instance.api_key + + def clean(self): + cleaned_data = super().clean() + if not cleaned_data["password"] and not cleaned_data["ssh"] and not cleaned_data["api_key"]: + cleaned_data["password"] = self.password_from_db + cleaned_data["ssh"] = self.ssh_from_db + cleaned_data["api_key"] = self.api_key_from_db + + return cleaned_data + + +class Tool_Configuration_Admin(admin.ModelAdmin): + form = ToolConfigForm_Admin + + +admin.site.register(Tool_Configuration, Tool_Configuration_Admin) diff --git a/dojo/tool_config/api/__init__.py b/dojo/tool_config/api/__init__.py new file mode 100644 index 00000000000..3f12ce59e94 --- /dev/null +++ b/dojo/tool_config/api/__init__.py @@ -0,0 +1 @@ +path = "tool_configurations" # noqa: RUF067 diff --git a/dojo/tool_config/api/serializer.py b/dojo/tool_config/api/serializer.py new file mode 100644 index 00000000000..f80dc1b782d --- /dev/null +++ b/dojo/tool_config/api/serializer.py @@ -0,0 +1,14 @@ +from rest_framework import serializers + +from dojo.tool_config.models import Tool_Configuration + + +class ToolConfigurationSerializer(serializers.ModelSerializer): + class Meta: + model = Tool_Configuration + fields = "__all__" + extra_kwargs = { + "password": {"write_only": True}, + "ssh": {"write_only": True}, + "api_key": {"write_only": True}, + } diff --git a/dojo/tool_config/api/urls.py b/dojo/tool_config/api/urls.py new file mode 100644 index 00000000000..cc4620473a7 --- /dev/null +++ b/dojo/tool_config/api/urls.py @@ -0,0 +1,6 @@ +from dojo.tool_config.api.views import ToolConfigurationsViewSet + + +def add_tool_config_urls(router): + router.register(r"tool_configurations", ToolConfigurationsViewSet, basename="tool_configuration") + return router diff --git a/dojo/tool_config/api/views.py b/dojo/tool_config/api/views.py new file mode 100644 index 00000000000..91a740610b7 --- /dev/null +++ b/dojo/tool_config/api/views.py @@ -0,0 +1,32 @@ +import logging + +from django_filters.rest_framework import DjangoFilterBackend +from drf_spectacular.utils import extend_schema_view + +from dojo.api_v2.views import PrefetchDojoModelViewSet, schema_with_prefetch +from dojo.authorization import api_permissions as permissions +from dojo.tool_config.api.serializer import ToolConfigurationSerializer +from dojo.tool_config.models import Tool_Configuration + +logger = logging.getLogger(__name__) + + +# Authorization: configurations +@extend_schema_view(**schema_with_prefetch()) +class ToolConfigurationsViewSet( + PrefetchDojoModelViewSet, +): + serializer_class = ToolConfigurationSerializer + queryset = Tool_Configuration.objects.none() + filter_backends = (DjangoFilterBackend,) + filterset_fields = [ + "id", + "name", + "tool_type", + "url", + "authentication_type", + ] + permission_classes = (permissions.UserHasConfigurationPermissionSuperuser,) + + def get_queryset(self): + return Tool_Configuration.objects.all().order_by("id") diff --git a/dojo/tool_config/models.py b/dojo/tool_config/models.py new file mode 100644 index 00000000000..6190fe839ce --- /dev/null +++ b/dojo/tool_config/models.py @@ -0,0 +1,31 @@ +from django.db import models +from django.utils.translation import gettext_lazy as _ + + +class Tool_Configuration(models.Model): + name = models.CharField(max_length=200, null=False) + description = models.CharField(max_length=2000, null=True, blank=True) + url = models.CharField(max_length=2000, null=True, blank=True) + tool_type = models.ForeignKey("dojo.Tool_Type", related_name="tool_type", on_delete=models.CASCADE) + authentication_type = models.CharField(max_length=15, + choices=( + ("API", "API Key"), + ("Password", + "Username/Password"), + ("SSH", "SSH")), + null=True, blank=True) + extras = models.CharField(max_length=255, null=True, blank=True, help_text=_("Additional definitions that will be " + "consumed by scanner")) + username = models.CharField(max_length=200, null=True, blank=True) + password = models.CharField(max_length=600, null=True, blank=True) + auth_title = models.CharField(max_length=200, null=True, blank=True, + verbose_name=_("Title for SSH/API Key")) + ssh = models.CharField(max_length=6000, null=True, blank=True) + api_key = models.CharField(max_length=600, null=True, blank=True, + verbose_name=_("API Key")) + + class Meta: + ordering = ["name"] + + def __str__(self): + return self.name diff --git a/dojo/tool_config/ui/__init__.py b/dojo/tool_config/ui/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dojo/tool_config/ui/forms.py b/dojo/tool_config/ui/forms.py new file mode 100644 index 00000000000..cc769725d78 --- /dev/null +++ b/dojo/tool_config/ui/forms.py @@ -0,0 +1,27 @@ +from django import forms +from django.core.validators import URLValidator + +from dojo.tool_config.models import Tool_Configuration +from dojo.tool_type.models import Tool_Type + + +class ToolConfigForm(forms.ModelForm): + tool_type = forms.ModelChoiceField(queryset=Tool_Type.objects.all(), label="Tool Type") + ssh = forms.CharField(widget=forms.Textarea(attrs={}), required=False, label="SSH Key") + + class Meta: + model = Tool_Configuration + exclude = ["product"] + + def clean(self): + form_data = self.cleaned_data + + try: + if form_data["url"] is not None: + url_validator = URLValidator(schemes=["ssh", "http", "https"]) + url_validator(form_data["url"]) + except forms.ValidationError: + msg = "It does not appear as though this endpoint is a valid URL/SSH or IP address." + raise forms.ValidationError(msg, code="invalid") + + return form_data diff --git a/dojo/tool_config/urls.py b/dojo/tool_config/ui/urls.py similarity index 89% rename from dojo/tool_config/urls.py rename to dojo/tool_config/ui/urls.py index 263142742e6..c2d5862e460 100644 --- a/dojo/tool_config/urls.py +++ b/dojo/tool_config/ui/urls.py @@ -1,6 +1,6 @@ from django.urls import re_path -from . import views +from dojo.tool_config.ui import views urlpatterns = [ re_path(r"^tool_config/add", views.new_tool_config, name="add_tool_config"), diff --git a/dojo/tool_config/views.py b/dojo/tool_config/ui/views.py similarity index 97% rename from dojo/tool_config/views.py rename to dojo/tool_config/ui/views.py index cb32d3203cf..a0807917925 100644 --- a/dojo/tool_config/views.py +++ b/dojo/tool_config/ui/views.py @@ -6,9 +6,9 @@ from django.shortcuts import render from django.urls import reverse -from dojo.forms import ToolConfigForm -from dojo.models import Tool_Configuration from dojo.tool_config.factory import create_API +from dojo.tool_config.models import Tool_Configuration +from dojo.tool_config.ui.forms import ToolConfigForm from dojo.utils import add_breadcrumb, dojo_crypto_encrypt, prepare_for_view logger = logging.getLogger(__name__) diff --git a/dojo/tool_product/__init__.py b/dojo/tool_product/__init__.py index e69de29bb2d..f145962c51d 100644 --- a/dojo/tool_product/__init__.py +++ b/dojo/tool_product/__init__.py @@ -0,0 +1 @@ +import dojo.tool_product.admin # noqa: F401 diff --git a/dojo/tool_product/admin.py b/dojo/tool_product/admin.py new file mode 100644 index 00000000000..19d8890eff1 --- /dev/null +++ b/dojo/tool_product/admin.py @@ -0,0 +1,6 @@ +from django.contrib import admin + +from dojo.tool_product.models import Tool_Product_History, Tool_Product_Settings + +admin.site.register(Tool_Product_Settings) +admin.site.register(Tool_Product_History) diff --git a/dojo/tool_product/api/__init__.py b/dojo/tool_product/api/__init__.py new file mode 100644 index 00000000000..1a68ec15513 --- /dev/null +++ b/dojo/tool_product/api/__init__.py @@ -0,0 +1 @@ +path = "tool_product_settings" # noqa: RUF067 diff --git a/dojo/tool_product/api/serializer.py b/dojo/tool_product/api/serializer.py new file mode 100644 index 00000000000..12aafcdfec7 --- /dev/null +++ b/dojo/tool_product/api/serializer.py @@ -0,0 +1,15 @@ +from rest_framework import serializers + +from dojo.models import Product +from dojo.tool_product.models import Tool_Product_Settings + + +class ToolProductSettingsSerializer(serializers.ModelSerializer): + setting_url = serializers.CharField(source="url") + product = serializers.PrimaryKeyRelatedField( + queryset=Product.objects.all(), required=True, + ) + + class Meta: + model = Tool_Product_Settings + fields = "__all__" diff --git a/dojo/tool_product/api/urls.py b/dojo/tool_product/api/urls.py new file mode 100644 index 00000000000..f3fca7fa2f2 --- /dev/null +++ b/dojo/tool_product/api/urls.py @@ -0,0 +1,6 @@ +from dojo.tool_product.api.views import ToolProductSettingsViewSet + + +def add_tool_product_urls(router): + router.register(r"tool_product_settings", ToolProductSettingsViewSet, basename="tool_product_settings") + return router diff --git a/dojo/tool_product/api/views.py b/dojo/tool_product/api/views.py new file mode 100644 index 00000000000..33a6278841d --- /dev/null +++ b/dojo/tool_product/api/views.py @@ -0,0 +1,38 @@ +import logging + +from django_filters.rest_framework import DjangoFilterBackend +from drf_spectacular.utils import extend_schema_view +from rest_framework.permissions import IsAuthenticated + +from dojo.api_v2.views import PrefetchDojoModelViewSet, schema_with_prefetch +from dojo.authorization import api_permissions as permissions +from dojo.tool_product.api.serializer import ToolProductSettingsSerializer +from dojo.tool_product.models import Tool_Product_Settings +from dojo.tool_product.queries import get_authorized_tool_product_settings + +logger = logging.getLogger(__name__) + + +# Authorization: object-based +@extend_schema_view(**schema_with_prefetch()) +class ToolProductSettingsViewSet( + PrefetchDojoModelViewSet, +): + serializer_class = ToolProductSettingsSerializer + queryset = Tool_Product_Settings.objects.none() + filter_backends = (DjangoFilterBackend,) + filterset_fields = [ + "id", + "name", + "product", + "tool_configuration", + "tool_project_id", + "url", + ] + permission_classes = ( + IsAuthenticated, + permissions.UserHasToolProductSettingsPermission, + ) + + def get_queryset(self): + return get_authorized_tool_product_settings("view") diff --git a/dojo/tool_product/models.py b/dojo/tool_product/models.py new file mode 100644 index 00000000000..2f77ea89a6e --- /dev/null +++ b/dojo/tool_product/models.py @@ -0,0 +1,25 @@ +from django.db import models +from django.utils.timezone import now +from django.utils.translation import gettext_lazy as _ + + +class Tool_Product_Settings(models.Model): + name = models.CharField(max_length=200, null=False) + description = models.CharField(max_length=2000, null=True, blank=True) + url = models.CharField(max_length=2000, null=True, blank=True) + product = models.ForeignKey("dojo.Product", default=1, editable=False, on_delete=models.CASCADE) + tool_configuration = models.ForeignKey("dojo.Tool_Configuration", null=False, + related_name="tool_configuration", on_delete=models.CASCADE) + tool_project_id = models.CharField(max_length=200, null=True, blank=True) + notes = models.ManyToManyField("dojo.Notes", blank=True, editable=False) + + class Meta: + ordering = ["name"] + + +class Tool_Product_History(models.Model): + product = models.ForeignKey("dojo.Tool_Product_Settings", editable=False, on_delete=models.CASCADE) + last_scan = models.DateTimeField(null=False, editable=False, default=now) + succesfull = models.BooleanField(default=True, verbose_name=_("Succesfully")) + configuration_details = models.CharField(max_length=2000, null=True, + blank=True) diff --git a/dojo/tool_product/queries.py b/dojo/tool_product/queries.py index 6ae66429fdc..45dd338b5b3 100644 --- a/dojo/tool_product/queries.py +++ b/dojo/tool_product/queries.py @@ -3,8 +3,8 @@ except ImportError: def get_auth_filter(key): return None -from dojo.models import Tool_Product_Settings from dojo.request_cache import cache_for_request +from dojo.tool_product.models import Tool_Product_Settings # Cached: all parameters are hashable, no dynamic queryset filtering diff --git a/dojo/tool_product/signals.py b/dojo/tool_product/signals.py index 96dd881ff45..391189cfb26 100644 --- a/dojo/tool_product/signals.py +++ b/dojo/tool_product/signals.py @@ -3,8 +3,8 @@ from django.db.models.signals import pre_delete from django.dispatch import receiver -from dojo.models import Tool_Product_Settings from dojo.notes.helper import delete_related_notes +from dojo.tool_product.models import Tool_Product_Settings logger = logging.getLogger(__name__) diff --git a/dojo/tool_product/ui/__init__.py b/dojo/tool_product/ui/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dojo/tool_product/ui/forms.py b/dojo/tool_product/ui/forms.py new file mode 100644 index 00000000000..f62dac4bb9c --- /dev/null +++ b/dojo/tool_product/ui/forms.py @@ -0,0 +1,37 @@ +from django import forms +from django.core.validators import URLValidator + +from dojo.tool_config.models import Tool_Configuration +from dojo.tool_product.models import Tool_Product_Settings + + +class DeleteToolProductSettingsForm(forms.ModelForm): + id = forms.IntegerField(required=True, + widget=forms.widgets.HiddenInput()) + + class Meta: + model = Tool_Product_Settings + fields = ["id"] + + +class ToolProductSettingsForm(forms.ModelForm): + tool_configuration = forms.ModelChoiceField(queryset=Tool_Configuration.objects.all(), label="Tool Configuration") + + class Meta: + model = Tool_Product_Settings + fields = ["name", "description", "url", "tool_configuration", "tool_project_id"] + exclude = ["tool_type"] + order = ["name"] + + def clean(self): + form_data = self.cleaned_data + + try: + if form_data["url"] is not None: + url_validator = URLValidator(schemes=["ssh", "http", "https"]) + url_validator(form_data["url"]) + except forms.ValidationError: + msg = "It does not appear as though this endpoint is a valid URL/SSH or IP address." + raise forms.ValidationError(msg, code="invalid") + + return form_data diff --git a/dojo/tool_product/urls.py b/dojo/tool_product/ui/urls.py similarity index 93% rename from dojo/tool_product/urls.py rename to dojo/tool_product/ui/urls.py index 9acc6cdb139..943647d5d49 100644 --- a/dojo/tool_product/urls.py +++ b/dojo/tool_product/ui/urls.py @@ -1,6 +1,6 @@ from django.urls import re_path -from . import views +from dojo.tool_product.ui import views urlpatterns = [ re_path(r"^product/(?P\d+)/tool_product/add$", views.new_tool_product, name="new_tool_product"), diff --git a/dojo/tool_product/views.py b/dojo/tool_product/ui/views.py similarity index 95% rename from dojo/tool_product/views.py rename to dojo/tool_product/ui/views.py index de142b1bcf8..39afab79e28 100644 --- a/dojo/tool_product/views.py +++ b/dojo/tool_product/ui/views.py @@ -8,8 +8,9 @@ from django.urls import reverse from django.utils.translation import gettext as _ -from dojo.forms import DeleteToolProductSettingsForm, ToolProductSettingsForm -from dojo.models import Product, Tool_Product_Settings +from dojo.models import Product +from dojo.tool_product.models import Tool_Product_Settings +from dojo.tool_product.ui.forms import DeleteToolProductSettingsForm, ToolProductSettingsForm from dojo.utils import Product_Tab logger = logging.getLogger(__name__) diff --git a/dojo/tool_type/__init__.py b/dojo/tool_type/__init__.py index e69de29bb2d..0235ecd395e 100644 --- a/dojo/tool_type/__init__.py +++ b/dojo/tool_type/__init__.py @@ -0,0 +1 @@ +import dojo.tool_type.admin # noqa: F401 diff --git a/dojo/tool_type/admin.py b/dojo/tool_type/admin.py new file mode 100644 index 00000000000..2196308ff07 --- /dev/null +++ b/dojo/tool_type/admin.py @@ -0,0 +1,5 @@ +from django.contrib import admin + +from dojo.tool_type.models import Tool_Type + +admin.site.register(Tool_Type) diff --git a/dojo/tool_type/api/__init__.py b/dojo/tool_type/api/__init__.py new file mode 100644 index 00000000000..7af3572bf6b --- /dev/null +++ b/dojo/tool_type/api/__init__.py @@ -0,0 +1 @@ +path = "tool_types" # noqa: RUF067 diff --git a/dojo/tool_type/api/serializer.py b/dojo/tool_type/api/serializer.py new file mode 100644 index 00000000000..5d056dcb8fc --- /dev/null +++ b/dojo/tool_type/api/serializer.py @@ -0,0 +1,18 @@ +from rest_framework import serializers + +from dojo.tool_type.models import Tool_Type + + +class ToolTypeSerializer(serializers.ModelSerializer): + class Meta: + model = Tool_Type + fields = "__all__" + + def validate(self, data): + if self.context["request"].method == "POST": + name = data.get("name") + # Make sure this will not create a duplicate test type + if Tool_Type.objects.filter(name=name).count() > 0: + msg = "A Tool Type with the name already exists" + raise serializers.ValidationError(msg) + return data diff --git a/dojo/tool_type/api/urls.py b/dojo/tool_type/api/urls.py new file mode 100644 index 00000000000..044c34fe6cc --- /dev/null +++ b/dojo/tool_type/api/urls.py @@ -0,0 +1,6 @@ +from dojo.tool_type.api.views import ToolTypesViewSet + + +def add_tool_type_urls(router): + router.register(r"tool_types", ToolTypesViewSet, basename="tool_type") + return router diff --git a/dojo/tool_type/api/views.py b/dojo/tool_type/api/views.py new file mode 100644 index 00000000000..b84af213ad9 --- /dev/null +++ b/dojo/tool_type/api/views.py @@ -0,0 +1,24 @@ +import logging + +from django_filters.rest_framework import DjangoFilterBackend + +from dojo.api_v2.views import DojoModelViewSet +from dojo.authorization import api_permissions as permissions +from dojo.tool_type.api.serializer import ToolTypeSerializer +from dojo.tool_type.models import Tool_Type + +logger = logging.getLogger(__name__) + + +# Authorization: configuration +class ToolTypesViewSet( + DojoModelViewSet, +): + serializer_class = ToolTypeSerializer + queryset = Tool_Type.objects.none() + filter_backends = (DjangoFilterBackend,) + filterset_fields = ["id", "name", "description"] + permission_classes = (permissions.UserHasConfigurationPermissionSuperuser,) + + def get_queryset(self): + return Tool_Type.objects.all().order_by("id") diff --git a/dojo/tool_type/models.py b/dojo/tool_type/models.py new file mode 100644 index 00000000000..a5c55213d32 --- /dev/null +++ b/dojo/tool_type/models.py @@ -0,0 +1,12 @@ +from django.db import models + + +class Tool_Type(models.Model): + name = models.CharField(max_length=200) + description = models.CharField(max_length=2000, null=True, blank=True) + + class Meta: + ordering = ["name"] + + def __str__(self): + return self.name diff --git a/dojo/tool_type/ui/__init__.py b/dojo/tool_type/ui/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/dojo/tool_type/ui/forms.py b/dojo/tool_type/ui/forms.py new file mode 100644 index 00000000000..8e7ff33f90f --- /dev/null +++ b/dojo/tool_type/ui/forms.py @@ -0,0 +1,27 @@ +from django import forms + +from dojo.tool_type.models import Tool_Type + + +class ToolTypeForm(forms.ModelForm): + class Meta: + model = Tool_Type + exclude = ["product"] + + def __init__(self, *args, **kwargs): + instance = kwargs.get("instance") + self.newly_created = True + if instance is not None: + self.newly_created = instance.pk is None + super().__init__(*args, **kwargs) + + def clean(self): + form_data = self.cleaned_data + if self.newly_created: + name = form_data.get("name") + # Make sure this will not create a duplicate test type + if Tool_Type.objects.filter(name=name).count() > 0: + msg = "A Tool Type with the name already exists" + raise forms.ValidationError(msg) + + return form_data diff --git a/dojo/tool_type/urls.py b/dojo/tool_type/ui/urls.py similarity index 89% rename from dojo/tool_type/urls.py rename to dojo/tool_type/ui/urls.py index 3b79b58d1b5..68d82c15be4 100644 --- a/dojo/tool_type/urls.py +++ b/dojo/tool_type/ui/urls.py @@ -1,6 +1,6 @@ from django.urls import re_path -from . import views +from dojo.tool_type.ui import views urlpatterns = [ re_path(r"^tool_type/add", views.new_tool_type, name="add_tool_type"), diff --git a/dojo/tool_type/views.py b/dojo/tool_type/ui/views.py similarity index 95% rename from dojo/tool_type/views.py rename to dojo/tool_type/ui/views.py index 3f9e8218136..551ed7c0f4e 100644 --- a/dojo/tool_type/views.py +++ b/dojo/tool_type/ui/views.py @@ -7,8 +7,8 @@ from django.urls import reverse from django.utils.translation import gettext as _ -from dojo.forms import ToolTypeForm -from dojo.models import Tool_Type +from dojo.tool_type.models import Tool_Type +from dojo.tool_type.ui.forms import ToolTypeForm from dojo.utils import add_breadcrumb logger = logging.getLogger(__name__) diff --git a/dojo/urls.py b/dojo/urls.py index 7992d87f3f3..706fe9225f3 100644 --- a/dojo/urls.py +++ b/dojo/urls.py @@ -18,9 +18,6 @@ ConfigurationPermissionViewSet, DevelopmentEnvironmentViewSet, DojoMetaViewSet, - EndpointMetaImporterView, - EndpointStatusViewSet, - EndPointViewSet, ImportLanguagesView, ImportScanView, JiraInstanceViewSet, @@ -37,9 +34,6 @@ SLAConfigurationViewset, SonarqubeIssueTransitionViewSet, SonarqubeIssueViewSet, - ToolConfigurationsViewSet, - ToolProductSettingsViewSet, - ToolTypesViewSet, ) from dojo.api_v2.views import DojoSpectacularAPIView as SpectacularAPIView from dojo.asset.api.urls import add_asset_urls @@ -48,7 +42,8 @@ from dojo.benchmark.urls import urlpatterns as benchmark_urls from dojo.components.urls import urlpatterns as component_urls from dojo.development_environment.urls import urlpatterns as dev_env_urls -from dojo.endpoint.urls import urlpatterns as endpoint_urls +from dojo.endpoint.api.urls import add_endpoint_urls, register_endpoint_meta_import +from dojo.endpoint.ui.urls import urlpatterns as endpoint_urls from dojo.engagement.api.urls import add_engagement_urls from dojo.engagement.ui.urls import urlpatterns as eng_urls from dojo.finding.api.urls import add_finding_urls @@ -79,9 +74,12 @@ from dojo.test.api.urls import add_test_urls from dojo.test.ui.urls import urlpatterns as test_urls from dojo.test_type.urls import urlpatterns as test_type_urls -from dojo.tool_config.urls import urlpatterns as tool_config_urls -from dojo.tool_product.urls import urlpatterns as tool_product_urls -from dojo.tool_type.urls import urlpatterns as tool_type_urls +from dojo.tool_config.api.urls import add_tool_config_urls +from dojo.tool_config.ui.urls import urlpatterns as tool_config_urls +from dojo.tool_product.api.urls import add_tool_product_urls +from dojo.tool_product.ui.urls import urlpatterns as tool_product_urls +from dojo.tool_type.api.urls import add_tool_type_urls +from dojo.tool_type.ui.urls import urlpatterns as tool_type_urls from dojo.url.api.urls import add_url_urls from dojo.url.ui.urls import urlpatterns as url_patterns from dojo.user.api.urls import add_user_urls @@ -105,7 +103,7 @@ v2_api.register(r"development_environments", DevelopmentEnvironmentViewSet, basename="development_environment") # RBAC endpoints moved to Pro under legacy authorization: # dojo_groups, dojo_group_members → pro/groups, pro/group_members -v2_api.register(r"endpoint_meta_import", EndpointMetaImporterView, basename="endpointmetaimport") +v2_api = register_endpoint_meta_import(v2_api) # RBAC endpoint moved to Pro under legacy authorization: global_roles → pro/global_roles v2_api.register(r"import-languages", ImportLanguagesView, basename="importlanguages") v2_api.register(r"import-scan", ImportScanView, basename="importscan") @@ -139,9 +137,9 @@ v2_api = add_system_settings_urls(v2_api) v2_api.register(r"technologies", AppAnalysisViewSet, basename="app_analysis") v2_api = add_test_urls(v2_api) -v2_api.register(r"tool_configurations", ToolConfigurationsViewSet, basename="tool_configuration") -v2_api.register(r"tool_product_settings", ToolProductSettingsViewSet, basename="tool_product_settings") -v2_api.register(r"tool_types", ToolTypesViewSet, basename="tool_type") +v2_api = add_tool_config_urls(v2_api) +v2_api = add_tool_product_urls(v2_api) +v2_api = add_tool_type_urls(v2_api) v2_api = add_user_urls(v2_api) # Add the location routes if settings.V3_FEATURE_LOCATIONS: @@ -151,8 +149,7 @@ v2_api.register(r"endpoints", V3EndpointCompatibleViewSet, basename="endpoint") v2_api.register(r"endpoint_status", V3EndpointStatusCompatibleViewSet, basename="endpoint_status") else: - v2_api.register(r"endpoints", EndPointViewSet, basename="endpoint") - v2_api.register(r"endpoint_status", EndpointStatusViewSet, basename="endpoint_status") + v2_api = add_endpoint_urls(v2_api) v2_api.register(r"celery", CeleryViewSet, basename="celery") # V3 add_asset_urls(v2_api) diff --git a/unittests/test_rest_framework.py b/unittests/test_rest_framework.py index 81cb43bac98..4e55f580cc5 100644 --- a/unittests/test_rest_framework.py +++ b/unittests/test_rest_framework.py @@ -41,8 +41,6 @@ AppAnalysisViewSet, ConfigurationPermissionViewSet, DevelopmentEnvironmentViewSet, - EndpointStatusViewSet, - EndPointViewSet, ImportLanguagesView, ImportScanView, JiraInstanceViewSet, @@ -54,15 +52,13 @@ NoteTypeViewSet, RiskAcceptanceViewSet, SonarqubeIssueViewSet, - ToolConfigurationsViewSet, - ToolProductSettingsViewSet, - ToolTypesViewSet, ) from dojo.asset.api.views import ( AssetAPIScanConfigurationViewSet, AssetViewSet, ) from dojo.authorization.roles_permissions import Permissions, permission_to_action +from dojo.endpoint.api.views import EndpointStatusViewSet, EndPointViewSet from dojo.engagement.api.views import EngagementViewSet from dojo.finding.api.views import ( BurpRawRequestResponseViewSet, @@ -114,6 +110,9 @@ from dojo.product.api.views import ProductAPIScanConfigurationViewSet, ProductViewSet from dojo.product_type.api.views import ProductTypeViewSet from dojo.test.api.views import TestsViewSet, TestTypesViewSet +from dojo.tool_config.api.views import ToolConfigurationsViewSet +from dojo.tool_product.api.views import ToolProductSettingsViewSet +from dojo.tool_type.api.views import ToolTypesViewSet from dojo.url.api.views import URLViewSet from dojo.url.models import URL from dojo.user.api.views import UserContactInfoViewSet, UsersViewSet