diff --git a/src/datacustomcode/deploy.py b/src/datacustomcode/deploy.py index 65495e6..e8c4ec4 100644 --- a/src/datacustomcode/deploy.py +++ b/src/datacustomcode/deploy.py @@ -26,12 +26,13 @@ Callable, Dict, List, + Optional, Union, ) from loguru import logger import pydantic -from pydantic import BaseModel +from pydantic import BaseModel, model_validator import requests from datacustomcode.cmd import cmd_output @@ -366,23 +367,86 @@ class BaseConfig(BaseModel): entryPoint: str +class DataObjectField(BaseModel): + name: str + label: str + dataType: str + isPrimaryKey: bool = False + keyQualifierFieldName: Optional[str] = None + + +class DataObject(BaseModel): + name: str + label: str + type: str + category: str + fields: list[DataObjectField] + + class DataTransformConfig(BaseConfig): sdkVersion: str dataspace: str permissions: Permissions + dataObjects: Optional[list[DataObject]] = None class FunctionConfig(BaseConfig): pass +class DloPermission(BaseModel): + dlo: list[str] + + +class DmoPermission(BaseModel): + dmo: list[str] + + class Permissions(BaseModel): - read: Union[DloPermission] - write: Union[DloPermission] + read: Union[DloPermission, DmoPermission] + write: Union[DloPermission, DmoPermission] + + @model_validator(mode="after") + def _no_mixed_layers(self) -> "Permissions": + read_is_dlo = isinstance(self.read, DloPermission) + write_is_dlo = isinstance(self.write, DloPermission) + if read_is_dlo != write_is_dlo: + raise ValueError( + "permissions.read and permissions.write must both reference " + "DLOs or both reference DMOs (got " + f"read={type(self.read).__name__}, " + f"write={type(self.write).__name__})" + ) + return self -class DloPermission(BaseModel): - dlo: list[str] +def _permission_entries(perm: Union[DloPermission, DmoPermission]) -> list[str]: + """Return the list of object names regardless of layer (DLO or DMO).""" + if isinstance(perm, DloPermission): + return perm.dlo + return perm.dmo + + +def _data_object_to_output(obj: DataObject) -> dict[str, Any]: + """Convert a config.json DataObject into an outputDataObjects entry.""" + fields: list[dict[str, Any]] = [] + for field in obj.fields: + entry: dict[str, Any] = { + "isPrimaryKey": field.isPrimaryKey, + "label": field.label, + "name": field.name, + "type": field.dataType, + } + if field.keyQualifierFieldName is not None: + entry["keyQualifierField"] = field.keyQualifierFieldName + fields.append(entry) + return { + "category": obj.category, + "fields": fields, + "label": obj.label, + "name": obj.name, + "type": obj.type, + } def get_config(directory: str) -> BaseConfig: @@ -404,10 +468,17 @@ def get_config(directory: str) -> BaseConfig: except json.JSONDecodeError as err: raise ValueError(f"config.json at {config_path} is not valid JSON") from err except pydantic.ValidationError as err: - missing_fields = [str(err["loc"][0]) for err in err.errors()] + errors = err.errors() + missing = [e for e in errors if e.get("type") == "missing"] + if missing and len(missing) == len(errors): + missing_fields = [str(e["loc"][0]) for e in missing] + raise ValueError( + f"config.json at {config_path} is missing required " + f"fields: {', '.join(missing_fields)}" + ) from err + messages = [str(e.get("msg", "")) for e in errors] raise ValueError( - f"config.json at {config_path} is missing required " - f"fields: {', '.join(missing_fields)}" + f"config.json at {config_path} is invalid: {'; '.join(messages)}" ) from err @@ -421,26 +492,45 @@ def create_data_transform( script_name = metadata.name request_hydrated = DATA_TRANSFORM_REQUEST_TEMPLATE.copy() - # Add nodes for each write DLO - for i, dlo in enumerate(data_transform_config.permissions.write.dlo, 1): + # Add nodes for each write entry (DLO or DMO) + for i, name in enumerate( + _permission_entries(data_transform_config.permissions.write), 1 + ): request_hydrated["nodes"][f"node{i}"] = { - "relation_name": dlo, + "relation_name": name, "config": {"materialized": "table"}, "compiled_code": "", } - # Add sources for each read DLO - for i, dlo in enumerate(data_transform_config.permissions.read.dlo, 1): - request_hydrated["sources"][f"source{i}"] = {"relation_name": dlo} + # Add sources for each read entry (DLO or DMO) + for i, name in enumerate( + _permission_entries(data_transform_config.permissions.read), 1 + ): + request_hydrated["sources"][f"source{i}"] = {"relation_name": name} request_hydrated["macros"]["macro.byoc"]["arguments"][0]["name"] = script_name + definition: dict[str, Any] = { + "type": "DCSQL", + "manifest": request_hydrated, + "version": "56.0", + } + + # outputDataObjects is only set for DMO-backed transforms. The server requires + # the schema of any DMO created/updated by the transform; DLO transforms use + # an existing materialized table and must not include this field. + if isinstance(data_transform_config.permissions.write, DmoPermission): + if not data_transform_config.dataObjects: + raise ValueError( + "DMO transforms require 'dataObjects' in config.json describing " + "the schema of each output DMO." + ) + definition["outputDataObjects"] = [ + _data_object_to_output(obj) for obj in data_transform_config.dataObjects + ] + body = { - "definition": { - "type": "DCSQL", - "manifest": request_hydrated, - "version": "56.0", - }, + "definition": definition, "label": f"{metadata.name}", "name": f"{metadata.name}", "type": "BATCH", diff --git a/tests/test_deploy.py b/tests/test_deploy.py index 2fd3ce2..af804d3 100644 --- a/tests/test_deploy.py +++ b/tests/test_deploy.py @@ -12,7 +12,10 @@ import requests from datacustomcode.deploy import ( + DataObject, + DataObjectField, DloPermission, + DmoPermission, Permissions, get_config, ) @@ -934,6 +937,87 @@ def test_verify_data_transform_config_missing_fields(self, mock_file, mock_exist ): get_config("/test/dir/payload") + @patch( + "builtins.open", + new_callable=mock_open, + read_data=( + '{"sdkVersion": "1.0.0", "entryPoint": "entrypoint.py", ' + '"dataspace": "test_dataspace", ' + '"permissions": {"read": {"dmo": ["input_dmo__dlm"]}, ' + '"write": {"dmo": ["output_dmo__dlm"]}}}' + ), + ) + def test_get_config_dmo_permissions(self, mock_file): + """DMO-only config.json parses into DmoPermission on both sides.""" + result = get_config("/test/dir") + assert isinstance(result, DataTransformConfig) + assert isinstance(result.permissions.read, DmoPermission) + assert isinstance(result.permissions.write, DmoPermission) + assert result.permissions.read.dmo == ["input_dmo__dlm"] + assert result.permissions.write.dmo == ["output_dmo__dlm"] + + @patch( + "builtins.open", + new_callable=mock_open, + read_data=( + '{"sdkVersion": "1.0.0", "entryPoint": "entrypoint.py", ' + '"dataspace": "test_dataspace", ' + '"permissions": {"read": {"dlo": ["input_dlo"]}, ' + '"write": {"dmo": ["output_dmo__dlm"]}}}' + ), + ) + def test_get_config_mixed_dlo_dmo_raises(self, mock_file): + """A config that mixes DLO read with DMO write is rejected.""" + with pytest.raises(ValueError) as excinfo: + get_config("/test/dir") + msg = str(excinfo.value) + assert "read" in msg and "write" in msg + + @patch( + "builtins.open", + new_callable=mock_open, + read_data=( + '{"sdkVersion": "1.0.0", "entryPoint": "entrypoint.py", ' + '"dataspace": "test_dataspace", ' + '"permissions": {"read": {}, "write": {"dlo": ["output_dlo"]}}}' + ), + ) + def test_get_config_empty_permission_raises(self, mock_file): + """A permission block with neither dlo nor dmo is rejected.""" + with pytest.raises(ValueError): + get_config("/test/dir") + + @patch( + "builtins.open", + new_callable=mock_open, + read_data=( + '{"sdkVersion": "0.1.14", "entryPoint": "entrypoint.py", ' + '"dataspace": "default", ' + '"permissions": {"read": {"dmo": ["ssot__Account__dlm"]}, ' + '"write": {"dmo": ["Account_DMO_Test__dlm"]}}, ' + '"dataObjects": [{' + '"name": "Account_DMO_Test__dlm", ' + '"label": "Account DMO Test", ' + '"type": "dataModelObject", ' + '"category": "profile", ' + '"fields": [{"name": "Id__c", "label": "Account Id", ' + '"dataType": "text", "isPrimaryKey": true, ' + '"keyQualifierFieldName": "KQ_Id1__c"}]' + "}]}" + ), + ) + def test_get_config_dmo_with_data_objects(self, mock_file): + """config.json parses the optional dataObjects schema for DMO writes.""" + result = get_config("/test/dir") + assert isinstance(result, DataTransformConfig) + assert result.dataObjects is not None + assert len(result.dataObjects) == 1 + obj = result.dataObjects[0] + assert obj.name == "Account_DMO_Test__dlm" + assert obj.category == "profile" + assert obj.fields[0].dataType == "text" + assert obj.fields[0].keyQualifierFieldName == "KQ_Id1__c" + class TestCreateDataTransform: @patch("datacustomcode.deploy.get_config") @@ -972,18 +1056,303 @@ def test_create_data_transform(self, mock_make_api_call, mock_get_config): request_body = mock_make_api_call.call_args[1]["json"] assert request_body["definition"]["type"] == "DCSQL" assert request_body["dataSpaceName"] == "test_dataspace" - assert "nodes" in request_body["definition"]["manifest"] - assert "sources" in request_body["definition"]["manifest"] - assert "macros" in request_body["definition"]["manifest"] - assert ( - request_body["definition"]["manifest"]["macros"]["macro.byoc"]["arguments"][ - 0 - ]["name"] - == "test_job" - ) + manifest = request_body["definition"]["manifest"] + assert manifest["nodes"] == { + "node1": { + "relation_name": "output_dlo", + "config": {"materialized": "table"}, + "compiled_code": "", + } + } + assert manifest["sources"] == {"source1": {"relation_name": "input_dlo"}} + assert manifest["macros"]["macro.byoc"]["arguments"][0]["name"] == "test_job" assert result == {"id": "transform_id"} + @patch("datacustomcode.deploy.get_config") + @patch("datacustomcode.deploy._make_api_call") + def test_create_data_transform_dmo(self, mock_make_api_call, mock_get_config): + """DMO permissions emit nodes/sources with DMO relation names.""" + access_token = AccessTokenResponse( + access_token="test_token", instance_url="https://instance.example.com" + ) + metadata = CodeExtensionMetadata( + name="dmo_job", + version="1.0.0", + description="DMO job", + computeType="CPU_M", + codeType="script", + ) + + data_transform_config = DataTransformConfig( + sdkVersion="1.0.0", + entryPoint="entrypoint.py", + dataspace="test_dataspace", + permissions=Permissions( + read=DmoPermission(dmo=["input_dmo__dlm"]), + write=DmoPermission(dmo=["output_dmo__dlm"]), + ), + dataObjects=[ + DataObject( + name="output_dmo__dlm", + label="Output DMO", + type="dataModelObject", + category="profile", + fields=[ + DataObjectField( + name="Id__c", + label="Id", + dataType="text", + isPrimaryKey=True, + keyQualifierFieldName="KQ_Id1__c", + ) + ], + ) + ], + ) + mock_make_api_call.return_value = {"id": "transform_id"} + + create_data_transform( + "/test/dir", access_token, metadata, data_transform_config + ) + + request_body = mock_make_api_call.call_args[1]["json"] + manifest = request_body["definition"]["manifest"] + assert manifest["nodes"] == { + "node1": { + "relation_name": "output_dmo__dlm", + "config": {"materialized": "table"}, + "compiled_code": "", + } + } + assert manifest["sources"] == {"source1": {"relation_name": "input_dmo__dlm"}} + assert manifest["macros"]["macro.byoc"]["arguments"][0]["name"] == "dmo_job" + assert request_body["dataSpaceName"] == "test_dataspace" + + @patch("datacustomcode.deploy.get_config") + @patch("datacustomcode.deploy._make_api_call") + def test_create_data_transform_multiple_dmos( + self, mock_make_api_call, mock_get_config + ): + """Multiple read DMOs become multiple sources; one write DMO is one node.""" + access_token = AccessTokenResponse( + access_token="test_token", instance_url="https://instance.example.com" + ) + metadata = CodeExtensionMetadata( + name="dmo_multi", + version="1.0.0", + description="DMO multi", + computeType="CPU_M", + codeType="script", + ) + + data_transform_config = DataTransformConfig( + sdkVersion="1.0.0", + entryPoint="entrypoint.py", + dataspace="test_dataspace", + permissions=Permissions( + read=DmoPermission(dmo=["in1__dlm", "in2__dlm"]), + write=DmoPermission(dmo=["out__dlm"]), + ), + dataObjects=[ + DataObject( + name="out__dlm", + label="Out", + type="dataModelObject", + category="profile", + fields=[ + DataObjectField( + name="Id__c", + label="Id", + dataType="text", + isPrimaryKey=True, + keyQualifierFieldName="KQ_Id1__c", + ) + ], + ) + ], + ) + mock_make_api_call.return_value = {"id": "transform_id"} + + create_data_transform( + "/test/dir", access_token, metadata, data_transform_config + ) + + request_body = mock_make_api_call.call_args[1]["json"] + manifest = request_body["definition"]["manifest"] + assert manifest["sources"] == { + "source1": {"relation_name": "in1__dlm"}, + "source2": {"relation_name": "in2__dlm"}, + } + assert manifest["nodes"] == { + "node1": { + "relation_name": "out__dlm", + "config": {"materialized": "table"}, + "compiled_code": "", + } + } + + @patch("datacustomcode.deploy.get_config") + @patch("datacustomcode.deploy._make_api_call") + def test_create_data_transform_dmo_emits_output_data_objects( + self, mock_make_api_call, mock_get_config + ): + """DMO transforms include outputDataObjects with transformed field names.""" + access_token = AccessTokenResponse( + access_token="test_token", instance_url="https://instance.example.com" + ) + metadata = CodeExtensionMetadata( + name="test_package", + version="1.0.0", + description="DMO with schema", + computeType="CPU_M", + codeType="script", + ) + + data_transform_config = DataTransformConfig( + sdkVersion="0.1.14", + entryPoint="entrypoint.py", + dataspace="default", + permissions=Permissions( + read=DmoPermission(dmo=["ssot__Account__dlm"]), + write=DmoPermission(dmo=["Account_DMO_Test__dlm"]), + ), + dataObjects=[ + DataObject( + name="Account_DMO_Test__dlm", + label="Account DMO Test", + type="dataModelObject", + category="profile", + fields=[ + DataObjectField( + name="Id__c", + label="Account Id", + dataType="text", + isPrimaryKey=True, + keyQualifierFieldName="KQ_Id1__c", + ), + DataObjectField( + name="KQ_Id1__c", + label="Key Qualifier Account Id", + dataType="text", + isPrimaryKey=False, + keyQualifierFieldName=None, + ), + DataObjectField( + name="Description__c", + label="Account Description", + dataType="text", + isPrimaryKey=False, + keyQualifierFieldName=None, + ), + ], + ) + ], + ) + mock_make_api_call.return_value = {"id": "transform_id"} + + create_data_transform( + "/test/dir", access_token, metadata, data_transform_config + ) + + request_body = mock_make_api_call.call_args[1]["json"] + assert request_body["definition"]["outputDataObjects"] == [ + { + "category": "profile", + "fields": [ + { + "isPrimaryKey": True, + "keyQualifierField": "KQ_Id1__c", + "label": "Account Id", + "name": "Id__c", + "type": "text", + }, + { + "isPrimaryKey": False, + "label": "Key Qualifier Account Id", + "name": "KQ_Id1__c", + "type": "text", + }, + { + "isPrimaryKey": False, + "label": "Account Description", + "name": "Description__c", + "type": "text", + }, + ], + "label": "Account DMO Test", + "name": "Account_DMO_Test__dlm", + "type": "dataModelObject", + } + ] + + @patch("datacustomcode.deploy.get_config") + @patch("datacustomcode.deploy._make_api_call") + def test_create_data_transform_dlo_omits_output_data_objects( + self, mock_make_api_call, mock_get_config + ): + """DLO transforms must not include outputDataObjects in the payload.""" + access_token = AccessTokenResponse( + access_token="test_token", instance_url="https://instance.example.com" + ) + metadata = CodeExtensionMetadata( + name="dlo_job", + version="1.0.0", + description="DLO job", + computeType="CPU_M", + codeType="script", + ) + + data_transform_config = DataTransformConfig( + sdkVersion="1.0.0", + entryPoint="entrypoint.py", + dataspace="test_dataspace", + permissions=Permissions( + read=DloPermission(dlo=["input_dlo"]), + write=DloPermission(dlo=["output_dlo"]), + ), + ) + mock_make_api_call.return_value = {"id": "transform_id"} + + create_data_transform( + "/test/dir", access_token, metadata, data_transform_config + ) + + request_body = mock_make_api_call.call_args[1]["json"] + assert "outputDataObjects" not in request_body["definition"] + + @patch("datacustomcode.deploy.get_config") + @patch("datacustomcode.deploy._make_api_call") + def test_create_data_transform_dmo_missing_data_objects_raises( + self, mock_make_api_call, mock_get_config + ): + """DMO transforms without dataObjects raise a clear error.""" + access_token = AccessTokenResponse( + access_token="test_token", instance_url="https://instance.example.com" + ) + metadata = CodeExtensionMetadata( + name="dmo_no_schema", + version="1.0.0", + description="DMO no schema", + computeType="CPU_M", + codeType="script", + ) + + data_transform_config = DataTransformConfig( + sdkVersion="1.0.0", + entryPoint="entrypoint.py", + dataspace="test_dataspace", + permissions=Permissions( + read=DmoPermission(dmo=["input_dmo__dlm"]), + write=DmoPermission(dmo=["output_dmo__dlm"]), + ), + ) + + with pytest.raises(ValueError, match="dataObjects"): + create_data_transform( + "/test/dir", access_token, metadata, data_transform_config + ) + class TestDeployFull: @patch("datacustomcode.deploy.get_config")