Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions dojo/api_v2/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,22 @@ class Meta:
model = Endpoint_Status
fields = "__all__"

def validate(self, data):
if self.instance is not None:
# Reject changes to Endpoint/Finding on existing objects
if "endpoint" in data and data["endpoint"] != self.instance.endpoint:
msg = "endpoint cannot be changed after creation"
raise serializers.ValidationError(msg)
if "finding" in data and data["finding"] != self.instance.finding:
msg = "finding cannot be changed after creation"
raise serializers.ValidationError(msg)
return data

if data["endpoint"].product_id != data["finding"].test.engagement.product_id:
msg = "endpoint and finding must belong to the same product"
raise serializers.ValidationError(msg)
return data

def run_validators(self, initial_data):
try:
return super().run_validators(initial_data)
Expand Down
27 changes: 19 additions & 8 deletions dojo/authorization/api_permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,17 +264,28 @@ def has_object_permission(self, request, view, obj):
# TODO: Delete this after the move to Locations
class UserHasEndpointStatusPermission(permissions.BasePermission):
def has_permission(self, request, view):
return check_post_permission(
request, Endpoint, "endpoint", "edit",
# Check the user can edit both the Endpoint and Finding that the Endpoint_Status will link to
return (
check_post_permission(request, Endpoint, "endpoint", "edit")
and check_post_permission(request, Finding, "finding", "edit")
)

def has_object_permission(self, request, view, obj):
return check_object_permission(
request,
obj.endpoint,
"view",
"edit",
"edit",
return (
check_object_permission(
request,
obj.endpoint,
"view",
"edit",
"edit",
)
and check_object_permission(
request,
obj.finding,
"view",
"edit",
"edit",
)
)


Expand Down
161 changes: 161 additions & 0 deletions unittests/test_endpoint_status_cross_product_authz.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
from django.urls import reverse
from django.utils.timezone import now
from rest_framework.authtoken.models import Token
from rest_framework.test import APIClient

from dojo.authorization.roles_permissions import Roles
from dojo.models import (
Dojo_User,
Endpoint,
Endpoint_Status,
Engagement,
Finding,
Product,
Product_Member,
Product_Type,
Role,
Test,
Test_Type,
User,
)
from unittests.dojo_test_case import DojoAPITestCase, skip_unless_v2


@skip_unless_v2
class EndpointStatusCrossProductAuthzTest(DojoAPITestCase):

"""Tests for the Endpoint_Status ViewSet permission checks."""

fixtures = ["dojo_testdata.json"]

@classmethod
def setUpTestData(cls):
prod_type, _ = Product_Type.objects.get_or_create(name="EPS-XProd PT")
test_type, _ = Test_Type.objects.get_or_create(name="EPS-XProd Scan")
writer_role = Role.objects.get(id=Roles.Writer)

cls.product_a = Product.objects.create(
name="EPS-XProd Product A",
description="A",
prod_type=prod_type,
)
cls.product_b = Product.objects.create(
name="EPS-XProd Product B",
description="B",
prod_type=prod_type,
)

cls.alice = User.objects.create_user(
username="eps_xprod_alice",
password="not-a-real-secret", # noqa: S106 - test fixture user
)
Product_Member.objects.create(user=cls.alice, product=cls.product_a, role=writer_role)
# Legacy authorization is membership-based via authorized_users;
# mirror the Product_Member row so the user can edit Product A.
cls.product_a.authorized_users.add(Dojo_User.objects.get(pk=cls.alice.pk))

cls.endpoint_a = Endpoint.objects.create(
product=cls.product_a, protocol="http", host="a.example.com",
)
cls.endpoint_b = Endpoint.objects.create(
product=cls.product_b, protocol="http", host="b.example.com",
)

cls.finding_a = cls._make_finding(cls.product_a, test_type, title="Finding A")
cls.finding_b = cls._make_finding(cls.product_b, test_type, title="Finding B")

cls.url = reverse("endpoint_status-list")

@classmethod
def _make_finding(cls, product, test_type, *, title):
engagement = Engagement.objects.create(
name=f"{product.name} Engagement",
product=product,
target_start=now(),
target_end=now(),
)
test = Test.objects.create(
engagement=engagement,
test_type=test_type,
target_start=now(),
target_end=now(),
)
return Finding.objects.create(
test=test,
title=title,
description=title,
severity="High",
numerical_severity="S0",
active=True,
verified=True,
)

def setUp(self):
super().setUp()
token, _ = Token.objects.get_or_create(user=self.alice)
self.client = APIClient()
self.client.credentials(HTTP_AUTHORIZATION="Token " + token.key)

# ---------- create-time cross-product checks ----------

def test_create_with_alice_endpoint_and_bob_finding_is_rejected(self):
response = self.client.post(self.url, {
"endpoint": self.endpoint_a.id,
"finding": self.finding_b.id,
})
# Permission-layer denial: finding-side check should reject.
self.assertIn(response.status_code, {403, 404}, response.content[:500])
self.assertFalse(
Endpoint_Status.objects.filter(
endpoint=self.endpoint_a, finding=self.finding_b,
).exists(),
)

def test_create_with_both_in_alice_product_is_allowed(self):
response = self.client.post(self.url, {
"endpoint": self.endpoint_a.id,
"finding": self.finding_a.id,
})
self.assertEqual(201, response.status_code, response.content[:500])

# ---------- PATCH-time cross-product checks ----------

def test_patch_cannot_move_row_into_bob_product(self):
# Alice creates a legitimate row inside Product A.
row = Endpoint_Status.objects.create(
endpoint=self.endpoint_a, finding=self.finding_a,
)
relative = f"{self.url}{row.id}/"

# Status-flag-only PATCH must still work.
response = self.client.patch(relative, {"false_positive": True}, format="json")
self.assertEqual(200, response.status_code, response.content[:500])

# FK PATCH into Product B must be rejected (both endpoint+finding).
response = self.client.patch(relative, {
"endpoint": self.endpoint_b.id,
"finding": self.finding_b.id,
"false_positive": True,
"out_of_scope": True,
"risk_accepted": True,
}, format="json")
self.assertIn(response.status_code, {400, 403, 404}, response.content[:500])

row.refresh_from_db()
self.assertEqual(row.endpoint_id, self.endpoint_a.id)
self.assertEqual(row.finding_id, self.finding_a.id)

def test_patch_with_cross_product_finding_only_is_rejected(self):
# Same-product baseline row.
row = Endpoint_Status.objects.create(
endpoint=self.endpoint_a, finding=self.finding_a,
)
relative = f"{self.url}{row.id}/"

# Swapping just the finding into Product B should fail (mismatched products
# or unauthorized target finding).
response = self.client.patch(relative, {"finding": self.finding_b.id}, format="json")
self.assertIn(response.status_code, {400, 403, 404}, response.content[:500])

row.refresh_from_db()
self.assertEqual(row.finding_id, self.finding_a.id)
69 changes: 69 additions & 0 deletions unittests/test_rest_framework.py
Original file line number Diff line number Diff line change
Expand Up @@ -1225,6 +1225,75 @@ def test_create_unsuccessful(self):
self.assertEqual(400, response.status_code, response.content[:1000])
self.assertIn("This endpoint-finding relation already exists", response.content.decode("utf-8"))

def test_update(self):
# Override the base-class PATCH+PUT cycle: PUT must preserve the row's
# endpoint/finding because the serializer freezes them after creation.
current_objects = self.client.get(self.url, format="json").data
first = current_objects["results"][0]
relative_url = self.url + "{}/".format(first["id"])

response = self.client.patch(relative_url, self.update_fields)
self.assertEqual(200, response.status_code, response.content[:1000])
self.check_schema_response("patch", "200", response, detail=True)

put_payload = self.payload.copy()
put_payload["endpoint"] = first["endpoint"]
put_payload["finding"] = first["finding"]
response = self.client.put(relative_url, put_payload)
self.assertEqual(200, response.status_code, response.content[:1000])
self.check_schema_response("put", "200", response, detail=True)

def test_patch_endpoint_change_rejected(self):
# Pick the first existing row, find a different endpoint in the same
# product, and confirm PATCH refuses to re-point the row.
current_objects = self.client.get(self.url, format="json").data
first = current_objects["results"][0]
row = Endpoint_Status.objects.get(pk=first["id"])
other_endpoint = Endpoint.objects.filter(
product_id=row.endpoint.product_id,
).exclude(pk=row.endpoint_id).first()
self.assertIsNotNone(other_endpoint, "fixture needs >1 endpoint per product")

response = self.client.patch(
self.url + f"{first['id']}/",
{"endpoint": other_endpoint.id},
format="json",
)
self.assertEqual(400, response.status_code, response.content[:1000])
self.assertIn("endpoint cannot be changed after creation", response.content.decode("utf-8"))

def test_patch_finding_change_rejected(self):
current_objects = self.client.get(self.url, format="json").data
first = current_objects["results"][0]
row = Endpoint_Status.objects.get(pk=first["id"])
other_finding = Finding.objects.filter(
test__engagement__product_id=row.endpoint.product_id,
).exclude(pk=row.finding_id).first()
self.assertIsNotNone(other_finding, "fixture needs >1 finding per product")

response = self.client.patch(
self.url + f"{first['id']}/",
{"finding": other_finding.id},
format="json",
)
self.assertEqual(400, response.status_code, response.content[:1000])
self.assertIn("finding cannot be changed after creation", response.content.decode("utf-8"))

def test_patch_same_endpoint_and_finding_succeeds(self):
# PATCH that re-sends the existing FK values (no change) must still pass.
current_objects = self.client.get(self.url, format="json").data
first = current_objects["results"][0]
response = self.client.patch(
self.url + f"{first['id']}/",
{
"endpoint": first["endpoint"],
"finding": first["finding"],
"false_positive": not first.get("false_positive", False),
},
format="json",
)
self.assertEqual(200, response.status_code, response.content[:1000])

def test_create_minimal(self):
# This call should not fail even if there is not date defined
minimal_payload = {
Expand Down
Loading