Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 109 additions & 19 deletions src/datacustomcode/deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,13 @@
Callable,
Dict,
List,
Optional,
Union,
)

from loguru import logger
import pydantic
from pydantic import BaseModel
from pydantic import BaseModel, model_validator
import requests

from datacustomcode.cmd import cmd_output
Expand Down Expand Up @@ -366,23 +367,86 @@ class BaseConfig(BaseModel):
entryPoint: str


class DataObjectField(BaseModel):
name: str
label: str
dataType: str
isPrimaryKey: bool = False
keyQualifierFieldName: Optional[str] = None


class DataObject(BaseModel):
name: str
label: str
type: str
category: str
fields: list[DataObjectField]


class DataTransformConfig(BaseConfig):
sdkVersion: str
dataspace: str
permissions: Permissions
dataObjects: Optional[list[DataObject]] = None


class FunctionConfig(BaseConfig):
pass


class DloPermission(BaseModel):
dlo: list[str]


class DmoPermission(BaseModel):
dmo: list[str]


class Permissions(BaseModel):
read: Union[DloPermission]
write: Union[DloPermission]
read: Union[DloPermission, DmoPermission]
write: Union[DloPermission, DmoPermission]

@model_validator(mode="after")
def _no_mixed_layers(self) -> "Permissions":
read_is_dlo = isinstance(self.read, DloPermission)
write_is_dlo = isinstance(self.write, DloPermission)
if read_is_dlo != write_is_dlo:
raise ValueError(
"permissions.read and permissions.write must both reference "
"DLOs or both reference DMOs (got "
f"read={type(self.read).__name__}, "
f"write={type(self.write).__name__})"
)
return self


class DloPermission(BaseModel):
dlo: list[str]
def _permission_entries(perm: Union[DloPermission, DmoPermission]) -> list[str]:
"""Return the list of object names regardless of layer (DLO or DMO)."""
if isinstance(perm, DloPermission):
return perm.dlo
return perm.dmo


def _data_object_to_output(obj: DataObject) -> dict[str, Any]:
"""Convert a config.json DataObject into an outputDataObjects entry."""
fields: list[dict[str, Any]] = []
for field in obj.fields:
entry: dict[str, Any] = {
"isPrimaryKey": field.isPrimaryKey,
"label": field.label,
"name": field.name,
"type": field.dataType,
}
if field.keyQualifierFieldName is not None:
entry["keyQualifierField"] = field.keyQualifierFieldName
fields.append(entry)
return {
"category": obj.category,
"fields": fields,
"label": obj.label,
"name": obj.name,
"type": obj.type,
}


def get_config(directory: str) -> BaseConfig:
Expand All @@ -404,10 +468,17 @@ def get_config(directory: str) -> BaseConfig:
except json.JSONDecodeError as err:
raise ValueError(f"config.json at {config_path} is not valid JSON") from err
except pydantic.ValidationError as err:
missing_fields = [str(err["loc"][0]) for err in err.errors()]
errors = err.errors()
missing = [e for e in errors if e.get("type") == "missing"]
if missing and len(missing) == len(errors):
missing_fields = [str(e["loc"][0]) for e in missing]
raise ValueError(
f"config.json at {config_path} is missing required "
f"fields: {', '.join(missing_fields)}"
) from err
messages = [str(e.get("msg", "")) for e in errors]
raise ValueError(
f"config.json at {config_path} is missing required "
f"fields: {', '.join(missing_fields)}"
f"config.json at {config_path} is invalid: {'; '.join(messages)}"
) from err


Expand All @@ -421,26 +492,45 @@ def create_data_transform(
script_name = metadata.name
request_hydrated = DATA_TRANSFORM_REQUEST_TEMPLATE.copy()

# Add nodes for each write DLO
for i, dlo in enumerate(data_transform_config.permissions.write.dlo, 1):
# Add nodes for each write entry (DLO or DMO)
for i, name in enumerate(
_permission_entries(data_transform_config.permissions.write), 1
):
request_hydrated["nodes"][f"node{i}"] = {
"relation_name": dlo,
"relation_name": name,
"config": {"materialized": "table"},
"compiled_code": "",
}

# Add sources for each read DLO
for i, dlo in enumerate(data_transform_config.permissions.read.dlo, 1):
request_hydrated["sources"][f"source{i}"] = {"relation_name": dlo}
# Add sources for each read entry (DLO or DMO)
for i, name in enumerate(
_permission_entries(data_transform_config.permissions.read), 1
):
request_hydrated["sources"][f"source{i}"] = {"relation_name": name}

request_hydrated["macros"]["macro.byoc"]["arguments"][0]["name"] = script_name

definition: dict[str, Any] = {
"type": "DCSQL",
"manifest": request_hydrated,
"version": "56.0",
}

# outputDataObjects is only set for DMO-backed transforms. The server requires
# the schema of any DMO created/updated by the transform; DLO transforms use
# an existing materialized table and must not include this field.
if isinstance(data_transform_config.permissions.write, DmoPermission):
if not data_transform_config.dataObjects:
raise ValueError(
"DMO transforms require 'dataObjects' in config.json describing "
"the schema of each output DMO."
)
definition["outputDataObjects"] = [
_data_object_to_output(obj) for obj in data_transform_config.dataObjects
]

body = {
"definition": {
"type": "DCSQL",
"manifest": request_hydrated,
"version": "56.0",
},
"definition": definition,
"label": f"{metadata.name}",
"name": f"{metadata.name}",
"type": "BATCH",
Expand Down
Loading
Loading