|
8 | 8 | from typing import Generic |
9 | 9 | from typing import List |
10 | 10 | from typing import Optional |
| 11 | +from typing import Tuple |
11 | 12 | from typing import Type |
12 | 13 | from typing import TypeVar |
13 | 14 | from typing import Union |
|
32 | 33 | from typing_extensions import NewType |
33 | 34 | from typing_extensions import Self |
34 | 35 |
|
35 | | -from scim2_models.attributes import contains_attribute_or_subattributes |
36 | | -from scim2_models.attributes import validate_attribute_urn |
37 | 36 | from scim2_models.utils import normalize_attribute_name |
38 | 37 | from scim2_models.utils import to_camel |
39 | 38 |
|
|
42 | 41 | ExternalReference = NewType("ExternalReference", str) |
43 | 42 |
|
44 | 43 |
|
| 44 | +def validate_model_attribute(model: Type, attribute_base: str) -> None: |
| 45 | + """Validate that an attribute name or a sub-attribute path exist for a |
| 46 | + given model.""" |
| 47 | + |
| 48 | + from scim2_models.base import BaseModel |
| 49 | + |
| 50 | + attribute_name, *sub_attribute_blocks = attribute_base.split(".") |
| 51 | + sub_attribute_base = ".".join(sub_attribute_blocks) |
| 52 | + |
| 53 | + aliases = {field.validation_alias for field in model.model_fields.values()} |
| 54 | + |
| 55 | + if normalize_attribute_name(attribute_name) not in aliases: |
| 56 | + raise ValueError( |
| 57 | + f"Model '{model.__name__}' has no attribute named '{attribute_name}'" |
| 58 | + ) |
| 59 | + |
| 60 | + if sub_attribute_base: |
| 61 | + attribute_type = model.get_field_root_type(attribute_name) |
| 62 | + |
| 63 | + if not issubclass(attribute_type, BaseModel): |
| 64 | + raise ValueError( |
| 65 | + f"Attribute '{attribute_name}' is not a complex attribute, and cannot have a '{sub_attribute_base}' sub-attribute" |
| 66 | + ) |
| 67 | + |
| 68 | + validate_model_attribute(attribute_type, sub_attribute_base) |
| 69 | + |
| 70 | + |
| 71 | +def extract_schema_and_attribute_base(attribute_urn: str) -> Tuple[str, str]: |
| 72 | + # Extract the schema urn part and the attribute name part from attribute |
| 73 | + # name, as defined in :rfc:`RFC7644 §3.10 <7644#section-3.10>`. |
| 74 | + |
| 75 | + *urn_blocks, attribute_base = attribute_urn.split(":") |
| 76 | + schema = ":".join(urn_blocks) |
| 77 | + return schema, attribute_base |
| 78 | + |
| 79 | + |
| 80 | +def validate_attribute_urn( |
| 81 | + attribute_name: str, |
| 82 | + default_resource: Optional[Type] = None, |
| 83 | + resource_types: Optional[List[Type]] = None, |
| 84 | +) -> str: |
| 85 | + """Validate that an attribute urn is valid or not. |
| 86 | +
|
| 87 | + :param attribute_name: The attribute urn to check. |
| 88 | + :default_resource: The default resource if `attribute_name` is not an absolute urn. |
| 89 | + :resource_types: The available resources in which to look for the attribute. |
| 90 | + :return: The normalized attribute URN. |
| 91 | + """ |
| 92 | + |
| 93 | + from scim2_models.rfc7643.resource import Resource |
| 94 | + |
| 95 | + if not resource_types: |
| 96 | + resource_types = [] |
| 97 | + |
| 98 | + if default_resource and default_resource not in resource_types: |
| 99 | + resource_types.append(default_resource) |
| 100 | + |
| 101 | + default_schema = ( |
| 102 | + default_resource.model_fields["schemas"].default[0] |
| 103 | + if default_resource |
| 104 | + else None |
| 105 | + ) |
| 106 | + |
| 107 | + schema, attribute_base = extract_schema_and_attribute_base(attribute_name) |
| 108 | + if not schema: |
| 109 | + schema = default_schema |
| 110 | + |
| 111 | + if not schema: |
| 112 | + raise ValueError("No default schema and relative URN") |
| 113 | + |
| 114 | + resource = Resource.get_by_schema(resource_types, schema) |
| 115 | + if not resource: |
| 116 | + raise ValueError(f"No resource matching schema '{schema}'") |
| 117 | + |
| 118 | + validate_model_attribute(resource, attribute_base) |
| 119 | + |
| 120 | + return f"{schema}:{attribute_base}" |
| 121 | + |
| 122 | + |
| 123 | +def contains_attribute_or_subattributes(attribute_urns: List[str], attribute_urn: str): |
| 124 | + return attribute_urn in attribute_urns or any( |
| 125 | + item.startswith(f"{attribute_urn}.") or item.startswith(f"{attribute_urn}:") |
| 126 | + for item in attribute_urns |
| 127 | + ) |
| 128 | + |
| 129 | + |
45 | 130 | class Reference(UserString, Generic[ReferenceTypes]): |
46 | 131 | """Reference type as defined in :rfc:`RFC7643 §2.3.7 <7643#section-2.3.7>`. |
47 | 132 |
|
|
0 commit comments