From 89bcb1ee22585d9a9c659c276d4aa0faedad8a2f Mon Sep 17 00:00:00 2001 From: Zach Maddox Date: Thu, 4 Jun 2026 16:21:08 -0400 Subject: [PATCH 1/2] fix deploy for dmo package --- src/datacustomcode/deploy.py | 63 +++++++++++--- tests/test_deploy.py | 159 +++++++++++++++++++++++++++++++++-- 2 files changed, 199 insertions(+), 23 deletions(-) diff --git a/src/datacustomcode/deploy.py b/src/datacustomcode/deploy.py index 65495e6..96b7379 100644 --- a/src/datacustomcode/deploy.py +++ b/src/datacustomcode/deploy.py @@ -31,7 +31,7 @@ from loguru import logger import pydantic -from pydantic import BaseModel +from pydantic import BaseModel, model_validator import requests from datacustomcode.cmd import cmd_output @@ -376,13 +376,37 @@ class FunctionConfig(BaseConfig): pass +class DloPermission(BaseModel): + dlo: list[str] + + +class DmoPermission(BaseModel): + dmo: list[str] + + class Permissions(BaseModel): - read: Union[DloPermission] - write: Union[DloPermission] + read: Union[DloPermission, DmoPermission] + write: Union[DloPermission, DmoPermission] + + @model_validator(mode="after") + def _no_mixed_layers(self) -> "Permissions": + read_is_dlo = isinstance(self.read, DloPermission) + write_is_dlo = isinstance(self.write, DloPermission) + if read_is_dlo != write_is_dlo: + raise ValueError( + "permissions.read and permissions.write must both reference " + "DLOs or both reference DMOs (got " + f"read={type(self.read).__name__}, " + f"write={type(self.write).__name__})" + ) + return self -class DloPermission(BaseModel): - dlo: list[str] +def _permission_entries(perm: Union[DloPermission, DmoPermission]) -> list[str]: + """Return the list of object names regardless of layer (DLO or DMO).""" + if isinstance(perm, DloPermission): + return perm.dlo + return perm.dmo def get_config(directory: str) -> BaseConfig: @@ -404,10 +428,17 @@ def get_config(directory: str) -> BaseConfig: except json.JSONDecodeError as err: raise ValueError(f"config.json at {config_path} is not valid JSON") from err except pydantic.ValidationError as err: - missing_fields = [str(err["loc"][0]) for err in err.errors()] + errors = err.errors() + missing = [e for e in errors if e.get("type") == "missing"] + if missing and len(missing) == len(errors): + missing_fields = [str(e["loc"][0]) for e in missing] + raise ValueError( + f"config.json at {config_path} is missing required " + f"fields: {', '.join(missing_fields)}" + ) from err + messages = [str(e.get("msg", "")) for e in errors] raise ValueError( - f"config.json at {config_path} is missing required " - f"fields: {', '.join(missing_fields)}" + f"config.json at {config_path} is invalid: {'; '.join(messages)}" ) from err @@ -421,17 +452,21 @@ def create_data_transform( script_name = metadata.name request_hydrated = DATA_TRANSFORM_REQUEST_TEMPLATE.copy() - # Add nodes for each write DLO - for i, dlo in enumerate(data_transform_config.permissions.write.dlo, 1): + # Add nodes for each write entry (DLO or DMO) + for i, name in enumerate( + _permission_entries(data_transform_config.permissions.write), 1 + ): request_hydrated["nodes"][f"node{i}"] = { - "relation_name": dlo, + "relation_name": name, "config": {"materialized": "table"}, "compiled_code": "", } - # Add sources for each read DLO - for i, dlo in enumerate(data_transform_config.permissions.read.dlo, 1): - request_hydrated["sources"][f"source{i}"] = {"relation_name": dlo} + # Add sources for each read entry (DLO or DMO) + for i, name in enumerate( + _permission_entries(data_transform_config.permissions.read), 1 + ): + request_hydrated["sources"][f"source{i}"] = {"relation_name": name} request_hydrated["macros"]["macro.byoc"]["arguments"][0]["name"] = script_name diff --git a/tests/test_deploy.py b/tests/test_deploy.py index 2fd3ce2..0a7e09d 100644 --- a/tests/test_deploy.py +++ b/tests/test_deploy.py @@ -13,6 +13,7 @@ from datacustomcode.deploy import ( DloPermission, + DmoPermission, Permissions, get_config, ) @@ -934,6 +935,56 @@ def test_verify_data_transform_config_missing_fields(self, mock_file, mock_exist ): get_config("/test/dir/payload") + @patch( + "builtins.open", + new_callable=mock_open, + read_data=( + '{"sdkVersion": "1.0.0", "entryPoint": "entrypoint.py", ' + '"dataspace": "test_dataspace", ' + '"permissions": {"read": {"dmo": ["input_dmo__dlm"]}, ' + '"write": {"dmo": ["output_dmo__dlm"]}}}' + ), + ) + def test_get_config_dmo_permissions(self, mock_file): + """DMO-only config.json parses into DmoPermission on both sides.""" + result = get_config("/test/dir") + assert isinstance(result, DataTransformConfig) + assert isinstance(result.permissions.read, DmoPermission) + assert isinstance(result.permissions.write, DmoPermission) + assert result.permissions.read.dmo == ["input_dmo__dlm"] + assert result.permissions.write.dmo == ["output_dmo__dlm"] + + @patch( + "builtins.open", + new_callable=mock_open, + read_data=( + '{"sdkVersion": "1.0.0", "entryPoint": "entrypoint.py", ' + '"dataspace": "test_dataspace", ' + '"permissions": {"read": {"dlo": ["input_dlo"]}, ' + '"write": {"dmo": ["output_dmo__dlm"]}}}' + ), + ) + def test_get_config_mixed_dlo_dmo_raises(self, mock_file): + """A config that mixes DLO read with DMO write is rejected.""" + with pytest.raises(ValueError) as excinfo: + get_config("/test/dir") + msg = str(excinfo.value) + assert "read" in msg and "write" in msg + + @patch( + "builtins.open", + new_callable=mock_open, + read_data=( + '{"sdkVersion": "1.0.0", "entryPoint": "entrypoint.py", ' + '"dataspace": "test_dataspace", ' + '"permissions": {"read": {}, "write": {"dlo": ["output_dlo"]}}}' + ), + ) + def test_get_config_empty_permission_raises(self, mock_file): + """A permission block with neither dlo nor dmo is rejected.""" + with pytest.raises(ValueError): + get_config("/test/dir") + class TestCreateDataTransform: @patch("datacustomcode.deploy.get_config") @@ -972,18 +1023,108 @@ def test_create_data_transform(self, mock_make_api_call, mock_get_config): request_body = mock_make_api_call.call_args[1]["json"] assert request_body["definition"]["type"] == "DCSQL" assert request_body["dataSpaceName"] == "test_dataspace" - assert "nodes" in request_body["definition"]["manifest"] - assert "sources" in request_body["definition"]["manifest"] - assert "macros" in request_body["definition"]["manifest"] - assert ( - request_body["definition"]["manifest"]["macros"]["macro.byoc"]["arguments"][ - 0 - ]["name"] - == "test_job" - ) + manifest = request_body["definition"]["manifest"] + assert manifest["nodes"] == { + "node1": { + "relation_name": "output_dlo", + "config": {"materialized": "table"}, + "compiled_code": "", + } + } + assert manifest["sources"] == {"source1": {"relation_name": "input_dlo"}} + assert manifest["macros"]["macro.byoc"]["arguments"][0]["name"] == "test_job" assert result == {"id": "transform_id"} + @patch("datacustomcode.deploy.get_config") + @patch("datacustomcode.deploy._make_api_call") + def test_create_data_transform_dmo(self, mock_make_api_call, mock_get_config): + """DMO permissions emit nodes/sources with DMO relation names.""" + access_token = AccessTokenResponse( + access_token="test_token", instance_url="https://instance.example.com" + ) + metadata = CodeExtensionMetadata( + name="dmo_job", + version="1.0.0", + description="DMO job", + computeType="CPU_M", + codeType="script", + ) + + data_transform_config = DataTransformConfig( + sdkVersion="1.0.0", + entryPoint="entrypoint.py", + dataspace="test_dataspace", + permissions=Permissions( + read=DmoPermission(dmo=["input_dmo__dlm"]), + write=DmoPermission(dmo=["output_dmo__dlm"]), + ), + ) + mock_make_api_call.return_value = {"id": "transform_id"} + + create_data_transform( + "/test/dir", access_token, metadata, data_transform_config + ) + + request_body = mock_make_api_call.call_args[1]["json"] + manifest = request_body["definition"]["manifest"] + assert manifest["nodes"] == { + "node1": { + "relation_name": "output_dmo__dlm", + "config": {"materialized": "table"}, + "compiled_code": "", + } + } + assert manifest["sources"] == {"source1": {"relation_name": "input_dmo__dlm"}} + assert manifest["macros"]["macro.byoc"]["arguments"][0]["name"] == "dmo_job" + assert request_body["dataSpaceName"] == "test_dataspace" + + @patch("datacustomcode.deploy.get_config") + @patch("datacustomcode.deploy._make_api_call") + def test_create_data_transform_multiple_dmos( + self, mock_make_api_call, mock_get_config + ): + """Multiple read DMOs become multiple sources; one write DMO is one node.""" + access_token = AccessTokenResponse( + access_token="test_token", instance_url="https://instance.example.com" + ) + metadata = CodeExtensionMetadata( + name="dmo_multi", + version="1.0.0", + description="DMO multi", + computeType="CPU_M", + codeType="script", + ) + + data_transform_config = DataTransformConfig( + sdkVersion="1.0.0", + entryPoint="entrypoint.py", + dataspace="test_dataspace", + permissions=Permissions( + read=DmoPermission(dmo=["in1__dlm", "in2__dlm"]), + write=DmoPermission(dmo=["out__dlm"]), + ), + ) + mock_make_api_call.return_value = {"id": "transform_id"} + + create_data_transform( + "/test/dir", access_token, metadata, data_transform_config + ) + + request_body = mock_make_api_call.call_args[1]["json"] + manifest = request_body["definition"]["manifest"] + assert manifest["sources"] == { + "source1": {"relation_name": "in1__dlm"}, + "source2": {"relation_name": "in2__dlm"}, + } + assert manifest["nodes"] == { + "node1": { + "relation_name": "out__dlm", + "config": {"materialized": "table"}, + "compiled_code": "", + } + } + class TestDeployFull: @patch("datacustomcode.deploy.get_config") From d477cf405ded4fae45711b47c243cdee1f751c10 Mon Sep 17 00:00:00 2001 From: Zach Maddox Date: Fri, 5 Jun 2026 16:51:08 -0400 Subject: [PATCH 2/2] pass output data object to transform api --- src/datacustomcode/deploy.py | 65 +++++++++- tests/test_deploy.py | 228 +++++++++++++++++++++++++++++++++++ 2 files changed, 288 insertions(+), 5 deletions(-) diff --git a/src/datacustomcode/deploy.py b/src/datacustomcode/deploy.py index 96b7379..e8c4ec4 100644 --- a/src/datacustomcode/deploy.py +++ b/src/datacustomcode/deploy.py @@ -26,6 +26,7 @@ Callable, Dict, List, + Optional, Union, ) @@ -366,10 +367,27 @@ class BaseConfig(BaseModel): entryPoint: str +class DataObjectField(BaseModel): + name: str + label: str + dataType: str + isPrimaryKey: bool = False + keyQualifierFieldName: Optional[str] = None + + +class DataObject(BaseModel): + name: str + label: str + type: str + category: str + fields: list[DataObjectField] + + class DataTransformConfig(BaseConfig): sdkVersion: str dataspace: str permissions: Permissions + dataObjects: Optional[list[DataObject]] = None class FunctionConfig(BaseConfig): @@ -409,6 +427,28 @@ def _permission_entries(perm: Union[DloPermission, DmoPermission]) -> list[str]: return perm.dmo +def _data_object_to_output(obj: DataObject) -> dict[str, Any]: + """Convert a config.json DataObject into an outputDataObjects entry.""" + fields: list[dict[str, Any]] = [] + for field in obj.fields: + entry: dict[str, Any] = { + "isPrimaryKey": field.isPrimaryKey, + "label": field.label, + "name": field.name, + "type": field.dataType, + } + if field.keyQualifierFieldName is not None: + entry["keyQualifierField"] = field.keyQualifierFieldName + fields.append(entry) + return { + "category": obj.category, + "fields": fields, + "label": obj.label, + "name": obj.name, + "type": obj.type, + } + + def get_config(directory: str) -> BaseConfig: """Get the code extension config from the config.json file.""" config_path = os.path.join(directory, "config.json") @@ -470,12 +510,27 @@ def create_data_transform( request_hydrated["macros"]["macro.byoc"]["arguments"][0]["name"] = script_name + definition: dict[str, Any] = { + "type": "DCSQL", + "manifest": request_hydrated, + "version": "56.0", + } + + # outputDataObjects is only set for DMO-backed transforms. The server requires + # the schema of any DMO created/updated by the transform; DLO transforms use + # an existing materialized table and must not include this field. + if isinstance(data_transform_config.permissions.write, DmoPermission): + if not data_transform_config.dataObjects: + raise ValueError( + "DMO transforms require 'dataObjects' in config.json describing " + "the schema of each output DMO." + ) + definition["outputDataObjects"] = [ + _data_object_to_output(obj) for obj in data_transform_config.dataObjects + ] + body = { - "definition": { - "type": "DCSQL", - "manifest": request_hydrated, - "version": "56.0", - }, + "definition": definition, "label": f"{metadata.name}", "name": f"{metadata.name}", "type": "BATCH", diff --git a/tests/test_deploy.py b/tests/test_deploy.py index 0a7e09d..af804d3 100644 --- a/tests/test_deploy.py +++ b/tests/test_deploy.py @@ -12,6 +12,8 @@ import requests from datacustomcode.deploy import ( + DataObject, + DataObjectField, DloPermission, DmoPermission, Permissions, @@ -985,6 +987,37 @@ def test_get_config_empty_permission_raises(self, mock_file): with pytest.raises(ValueError): get_config("/test/dir") + @patch( + "builtins.open", + new_callable=mock_open, + read_data=( + '{"sdkVersion": "0.1.14", "entryPoint": "entrypoint.py", ' + '"dataspace": "default", ' + '"permissions": {"read": {"dmo": ["ssot__Account__dlm"]}, ' + '"write": {"dmo": ["Account_DMO_Test__dlm"]}}, ' + '"dataObjects": [{' + '"name": "Account_DMO_Test__dlm", ' + '"label": "Account DMO Test", ' + '"type": "dataModelObject", ' + '"category": "profile", ' + '"fields": [{"name": "Id__c", "label": "Account Id", ' + '"dataType": "text", "isPrimaryKey": true, ' + '"keyQualifierFieldName": "KQ_Id1__c"}]' + "}]}" + ), + ) + def test_get_config_dmo_with_data_objects(self, mock_file): + """config.json parses the optional dataObjects schema for DMO writes.""" + result = get_config("/test/dir") + assert isinstance(result, DataTransformConfig) + assert result.dataObjects is not None + assert len(result.dataObjects) == 1 + obj = result.dataObjects[0] + assert obj.name == "Account_DMO_Test__dlm" + assert obj.category == "profile" + assert obj.fields[0].dataType == "text" + assert obj.fields[0].keyQualifierFieldName == "KQ_Id1__c" + class TestCreateDataTransform: @patch("datacustomcode.deploy.get_config") @@ -1059,6 +1092,23 @@ def test_create_data_transform_dmo(self, mock_make_api_call, mock_get_config): read=DmoPermission(dmo=["input_dmo__dlm"]), write=DmoPermission(dmo=["output_dmo__dlm"]), ), + dataObjects=[ + DataObject( + name="output_dmo__dlm", + label="Output DMO", + type="dataModelObject", + category="profile", + fields=[ + DataObjectField( + name="Id__c", + label="Id", + dataType="text", + isPrimaryKey=True, + keyQualifierFieldName="KQ_Id1__c", + ) + ], + ) + ], ) mock_make_api_call.return_value = {"id": "transform_id"} @@ -1104,6 +1154,23 @@ def test_create_data_transform_multiple_dmos( read=DmoPermission(dmo=["in1__dlm", "in2__dlm"]), write=DmoPermission(dmo=["out__dlm"]), ), + dataObjects=[ + DataObject( + name="out__dlm", + label="Out", + type="dataModelObject", + category="profile", + fields=[ + DataObjectField( + name="Id__c", + label="Id", + dataType="text", + isPrimaryKey=True, + keyQualifierFieldName="KQ_Id1__c", + ) + ], + ) + ], ) mock_make_api_call.return_value = {"id": "transform_id"} @@ -1125,6 +1192,167 @@ def test_create_data_transform_multiple_dmos( } } + @patch("datacustomcode.deploy.get_config") + @patch("datacustomcode.deploy._make_api_call") + def test_create_data_transform_dmo_emits_output_data_objects( + self, mock_make_api_call, mock_get_config + ): + """DMO transforms include outputDataObjects with transformed field names.""" + access_token = AccessTokenResponse( + access_token="test_token", instance_url="https://instance.example.com" + ) + metadata = CodeExtensionMetadata( + name="test_package", + version="1.0.0", + description="DMO with schema", + computeType="CPU_M", + codeType="script", + ) + + data_transform_config = DataTransformConfig( + sdkVersion="0.1.14", + entryPoint="entrypoint.py", + dataspace="default", + permissions=Permissions( + read=DmoPermission(dmo=["ssot__Account__dlm"]), + write=DmoPermission(dmo=["Account_DMO_Test__dlm"]), + ), + dataObjects=[ + DataObject( + name="Account_DMO_Test__dlm", + label="Account DMO Test", + type="dataModelObject", + category="profile", + fields=[ + DataObjectField( + name="Id__c", + label="Account Id", + dataType="text", + isPrimaryKey=True, + keyQualifierFieldName="KQ_Id1__c", + ), + DataObjectField( + name="KQ_Id1__c", + label="Key Qualifier Account Id", + dataType="text", + isPrimaryKey=False, + keyQualifierFieldName=None, + ), + DataObjectField( + name="Description__c", + label="Account Description", + dataType="text", + isPrimaryKey=False, + keyQualifierFieldName=None, + ), + ], + ) + ], + ) + mock_make_api_call.return_value = {"id": "transform_id"} + + create_data_transform( + "/test/dir", access_token, metadata, data_transform_config + ) + + request_body = mock_make_api_call.call_args[1]["json"] + assert request_body["definition"]["outputDataObjects"] == [ + { + "category": "profile", + "fields": [ + { + "isPrimaryKey": True, + "keyQualifierField": "KQ_Id1__c", + "label": "Account Id", + "name": "Id__c", + "type": "text", + }, + { + "isPrimaryKey": False, + "label": "Key Qualifier Account Id", + "name": "KQ_Id1__c", + "type": "text", + }, + { + "isPrimaryKey": False, + "label": "Account Description", + "name": "Description__c", + "type": "text", + }, + ], + "label": "Account DMO Test", + "name": "Account_DMO_Test__dlm", + "type": "dataModelObject", + } + ] + + @patch("datacustomcode.deploy.get_config") + @patch("datacustomcode.deploy._make_api_call") + def test_create_data_transform_dlo_omits_output_data_objects( + self, mock_make_api_call, mock_get_config + ): + """DLO transforms must not include outputDataObjects in the payload.""" + access_token = AccessTokenResponse( + access_token="test_token", instance_url="https://instance.example.com" + ) + metadata = CodeExtensionMetadata( + name="dlo_job", + version="1.0.0", + description="DLO job", + computeType="CPU_M", + codeType="script", + ) + + data_transform_config = DataTransformConfig( + sdkVersion="1.0.0", + entryPoint="entrypoint.py", + dataspace="test_dataspace", + permissions=Permissions( + read=DloPermission(dlo=["input_dlo"]), + write=DloPermission(dlo=["output_dlo"]), + ), + ) + mock_make_api_call.return_value = {"id": "transform_id"} + + create_data_transform( + "/test/dir", access_token, metadata, data_transform_config + ) + + request_body = mock_make_api_call.call_args[1]["json"] + assert "outputDataObjects" not in request_body["definition"] + + @patch("datacustomcode.deploy.get_config") + @patch("datacustomcode.deploy._make_api_call") + def test_create_data_transform_dmo_missing_data_objects_raises( + self, mock_make_api_call, mock_get_config + ): + """DMO transforms without dataObjects raise a clear error.""" + access_token = AccessTokenResponse( + access_token="test_token", instance_url="https://instance.example.com" + ) + metadata = CodeExtensionMetadata( + name="dmo_no_schema", + version="1.0.0", + description="DMO no schema", + computeType="CPU_M", + codeType="script", + ) + + data_transform_config = DataTransformConfig( + sdkVersion="1.0.0", + entryPoint="entrypoint.py", + dataspace="test_dataspace", + permissions=Permissions( + read=DmoPermission(dmo=["input_dmo__dlm"]), + write=DmoPermission(dmo=["output_dmo__dlm"]), + ), + ) + + with pytest.raises(ValueError, match="dataObjects"): + create_data_transform( + "/test/dir", access_token, metadata, data_transform_config + ) + class TestDeployFull: @patch("datacustomcode.deploy.get_config")