diff --git a/dojo/api_v2/serializers.py b/dojo/api_v2/serializers.py index b3b83a239b5..52f4ad3cbc8 100644 --- a/dojo/api_v2/serializers.py +++ b/dojo/api_v2/serializers.py @@ -865,6 +865,22 @@ class Meta: model = Endpoint_Status fields = "__all__" + def validate(self, data): + if self.instance is not None: + # Reject changes to Endpoint/Finding on existing objects + if "endpoint" in data and data["endpoint"] != self.instance.endpoint: + msg = "endpoint cannot be changed after creation" + raise serializers.ValidationError(msg) + if "finding" in data and data["finding"] != self.instance.finding: + msg = "finding cannot be changed after creation" + raise serializers.ValidationError(msg) + return data + + if data["endpoint"].product_id != data["finding"].test.engagement.product_id: + msg = "endpoint and finding must belong to the same product" + raise serializers.ValidationError(msg) + return data + def run_validators(self, initial_data): try: return super().run_validators(initial_data) diff --git a/dojo/authorization/api_permissions.py b/dojo/authorization/api_permissions.py index d8237c382f0..861ad9331ef 100644 --- a/dojo/authorization/api_permissions.py +++ b/dojo/authorization/api_permissions.py @@ -264,17 +264,28 @@ def has_object_permission(self, request, view, obj): # TODO: Delete this after the move to Locations class UserHasEndpointStatusPermission(permissions.BasePermission): def has_permission(self, request, view): - return check_post_permission( - request, Endpoint, "endpoint", "edit", + # Check the user can edit both the Endpoint and Finding that the Endpoint_Status will link to + return ( + check_post_permission(request, Endpoint, "endpoint", "edit") + and check_post_permission(request, Finding, "finding", "edit") ) def has_object_permission(self, request, view, obj): - return check_object_permission( - request, - obj.endpoint, - "view", - "edit", - "edit", + return ( + check_object_permission( + request, + obj.endpoint, + "view", + "edit", + "edit", + ) + and check_object_permission( + request, + obj.finding, + "view", + "edit", + "edit", + ) ) diff --git a/unittests/test_endpoint_status_cross_product_authz.py b/unittests/test_endpoint_status_cross_product_authz.py new file mode 100644 index 00000000000..f570d8c2469 --- /dev/null +++ b/unittests/test_endpoint_status_cross_product_authz.py @@ -0,0 +1,161 @@ +from django.urls import reverse +from django.utils.timezone import now +from rest_framework.authtoken.models import Token +from rest_framework.test import APIClient + +from dojo.authorization.roles_permissions import Roles +from dojo.models import ( + Dojo_User, + Endpoint, + Endpoint_Status, + Engagement, + Finding, + Product, + Product_Member, + Product_Type, + Role, + Test, + Test_Type, + User, +) +from unittests.dojo_test_case import DojoAPITestCase, skip_unless_v2 + + +@skip_unless_v2 +class EndpointStatusCrossProductAuthzTest(DojoAPITestCase): + + """Tests for the Endpoint_Status ViewSet permission checks.""" + + fixtures = ["dojo_testdata.json"] + + @classmethod + def setUpTestData(cls): + prod_type, _ = Product_Type.objects.get_or_create(name="EPS-XProd PT") + test_type, _ = Test_Type.objects.get_or_create(name="EPS-XProd Scan") + writer_role = Role.objects.get(id=Roles.Writer) + + cls.product_a = Product.objects.create( + name="EPS-XProd Product A", + description="A", + prod_type=prod_type, + ) + cls.product_b = Product.objects.create( + name="EPS-XProd Product B", + description="B", + prod_type=prod_type, + ) + + cls.alice = User.objects.create_user( + username="eps_xprod_alice", + password="not-a-real-secret", # noqa: S106 - test fixture user + ) + Product_Member.objects.create(user=cls.alice, product=cls.product_a, role=writer_role) + # Legacy authorization is membership-based via authorized_users; + # mirror the Product_Member row so the user can edit Product A. + cls.product_a.authorized_users.add(Dojo_User.objects.get(pk=cls.alice.pk)) + + cls.endpoint_a = Endpoint.objects.create( + product=cls.product_a, protocol="http", host="a.example.com", + ) + cls.endpoint_b = Endpoint.objects.create( + product=cls.product_b, protocol="http", host="b.example.com", + ) + + cls.finding_a = cls._make_finding(cls.product_a, test_type, title="Finding A") + cls.finding_b = cls._make_finding(cls.product_b, test_type, title="Finding B") + + cls.url = reverse("endpoint_status-list") + + @classmethod + def _make_finding(cls, product, test_type, *, title): + engagement = Engagement.objects.create( + name=f"{product.name} Engagement", + product=product, + target_start=now(), + target_end=now(), + ) + test = Test.objects.create( + engagement=engagement, + test_type=test_type, + target_start=now(), + target_end=now(), + ) + return Finding.objects.create( + test=test, + title=title, + description=title, + severity="High", + numerical_severity="S0", + active=True, + verified=True, + ) + + def setUp(self): + super().setUp() + token, _ = Token.objects.get_or_create(user=self.alice) + self.client = APIClient() + self.client.credentials(HTTP_AUTHORIZATION="Token " + token.key) + + # ---------- create-time cross-product checks ---------- + + def test_create_with_alice_endpoint_and_bob_finding_is_rejected(self): + response = self.client.post(self.url, { + "endpoint": self.endpoint_a.id, + "finding": self.finding_b.id, + }) + # Permission-layer denial: finding-side check should reject. + self.assertIn(response.status_code, {403, 404}, response.content[:500]) + self.assertFalse( + Endpoint_Status.objects.filter( + endpoint=self.endpoint_a, finding=self.finding_b, + ).exists(), + ) + + def test_create_with_both_in_alice_product_is_allowed(self): + response = self.client.post(self.url, { + "endpoint": self.endpoint_a.id, + "finding": self.finding_a.id, + }) + self.assertEqual(201, response.status_code, response.content[:500]) + + # ---------- PATCH-time cross-product checks ---------- + + def test_patch_cannot_move_row_into_bob_product(self): + # Alice creates a legitimate row inside Product A. + row = Endpoint_Status.objects.create( + endpoint=self.endpoint_a, finding=self.finding_a, + ) + relative = f"{self.url}{row.id}/" + + # Status-flag-only PATCH must still work. + response = self.client.patch(relative, {"false_positive": True}, format="json") + self.assertEqual(200, response.status_code, response.content[:500]) + + # FK PATCH into Product B must be rejected (both endpoint+finding). + response = self.client.patch(relative, { + "endpoint": self.endpoint_b.id, + "finding": self.finding_b.id, + "false_positive": True, + "out_of_scope": True, + "risk_accepted": True, + }, format="json") + self.assertIn(response.status_code, {400, 403, 404}, response.content[:500]) + + row.refresh_from_db() + self.assertEqual(row.endpoint_id, self.endpoint_a.id) + self.assertEqual(row.finding_id, self.finding_a.id) + + def test_patch_with_cross_product_finding_only_is_rejected(self): + # Same-product baseline row. + row = Endpoint_Status.objects.create( + endpoint=self.endpoint_a, finding=self.finding_a, + ) + relative = f"{self.url}{row.id}/" + + # Swapping just the finding into Product B should fail (mismatched products + # or unauthorized target finding). + response = self.client.patch(relative, {"finding": self.finding_b.id}, format="json") + self.assertIn(response.status_code, {400, 403, 404}, response.content[:500]) + + row.refresh_from_db() + self.assertEqual(row.finding_id, self.finding_a.id) diff --git a/unittests/test_rest_framework.py b/unittests/test_rest_framework.py index 4455dba1374..781d7c673a0 100644 --- a/unittests/test_rest_framework.py +++ b/unittests/test_rest_framework.py @@ -1225,6 +1225,75 @@ def test_create_unsuccessful(self): self.assertEqual(400, response.status_code, response.content[:1000]) self.assertIn("This endpoint-finding relation already exists", response.content.decode("utf-8")) + def test_update(self): + # Override the base-class PATCH+PUT cycle: PUT must preserve the row's + # endpoint/finding because the serializer freezes them after creation. + current_objects = self.client.get(self.url, format="json").data + first = current_objects["results"][0] + relative_url = self.url + "{}/".format(first["id"]) + + response = self.client.patch(relative_url, self.update_fields) + self.assertEqual(200, response.status_code, response.content[:1000]) + self.check_schema_response("patch", "200", response, detail=True) + + put_payload = self.payload.copy() + put_payload["endpoint"] = first["endpoint"] + put_payload["finding"] = first["finding"] + response = self.client.put(relative_url, put_payload) + self.assertEqual(200, response.status_code, response.content[:1000]) + self.check_schema_response("put", "200", response, detail=True) + + def test_patch_endpoint_change_rejected(self): + # Pick the first existing row, find a different endpoint in the same + # product, and confirm PATCH refuses to re-point the row. + current_objects = self.client.get(self.url, format="json").data + first = current_objects["results"][0] + row = Endpoint_Status.objects.get(pk=first["id"]) + other_endpoint = Endpoint.objects.filter( + product_id=row.endpoint.product_id, + ).exclude(pk=row.endpoint_id).first() + self.assertIsNotNone(other_endpoint, "fixture needs >1 endpoint per product") + + response = self.client.patch( + self.url + f"{first['id']}/", + {"endpoint": other_endpoint.id}, + format="json", + ) + self.assertEqual(400, response.status_code, response.content[:1000]) + self.assertIn("endpoint cannot be changed after creation", response.content.decode("utf-8")) + + def test_patch_finding_change_rejected(self): + current_objects = self.client.get(self.url, format="json").data + first = current_objects["results"][0] + row = Endpoint_Status.objects.get(pk=first["id"]) + other_finding = Finding.objects.filter( + test__engagement__product_id=row.endpoint.product_id, + ).exclude(pk=row.finding_id).first() + self.assertIsNotNone(other_finding, "fixture needs >1 finding per product") + + response = self.client.patch( + self.url + f"{first['id']}/", + {"finding": other_finding.id}, + format="json", + ) + self.assertEqual(400, response.status_code, response.content[:1000]) + self.assertIn("finding cannot be changed after creation", response.content.decode("utf-8")) + + def test_patch_same_endpoint_and_finding_succeeds(self): + # PATCH that re-sends the existing FK values (no change) must still pass. + current_objects = self.client.get(self.url, format="json").data + first = current_objects["results"][0] + response = self.client.patch( + self.url + f"{first['id']}/", + { + "endpoint": first["endpoint"], + "finding": first["finding"], + "false_positive": not first.get("false_positive", False), + }, + format="json", + ) + self.assertEqual(200, response.status_code, response.content[:1000]) + def test_create_minimal(self): # This call should not fail even if there is not date defined minimal_payload = {