diff --git a/examples/kubeflow/hello_kubeflow.py b/examples/kubeflow/hello_kubeflow.py index 67b97351..0cdf01ec 100644 --- a/examples/kubeflow/hello_kubeflow.py +++ b/examples/kubeflow/hello_kubeflow.py @@ -108,8 +108,10 @@ ], # Sync the generated launch script to the pod via PVC before launch. # Required whenever you use a custom launcher (e.g. run.Torchrun()). - workdir_pvc=args.pvc, - workdir_pvc_path="/nemo-workspace", + # Must match the workdir volume name and mountPath above. + workdir_volume_mount=( + {"name": "workdir", "mountPath": "/nemo-workspace"} if args.pvc else None + ), labels={"app": JOB_NAME}, ) diff --git a/nemo_run/core/execution/kubeflow.py b/nemo_run/core/execution/kubeflow.py index ea0d8cf0..a6a480d2 100644 --- a/nemo_run/core/execution/kubeflow.py +++ b/nemo_run/core/execution/kubeflow.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import copy import getpass import logging import os @@ -92,10 +93,15 @@ class KubeflowExecutor(Executor): image_pull_secrets: list[str] = field(default_factory=list) spec_kwargs: dict[str, Any] = field(default_factory=dict) container_kwargs: dict[str, Any] = field(default_factory=dict) - # Workdir sync: if set, package() rsyncs job_dir → PVC before launch and - # pull_results() rsyncs the PVC back to job_dir after the job completes. - workdir_pvc: Optional[str] = None - workdir_pvc_path: str = "/nemo_run" + # Workdir sync: when ``workdir_volume_mount`` is set, package() rsyncs job_dir → PVC + # before launch and pull_results() rsyncs back. You must declare the matching Volume + # in ``volumes`` (typically persistentVolumeClaim). On construction, + # ``__post_init__`` validates that layout and merges work/data mounts into ``volume_mounts`` + # when the same ``(name, mountPath, subPath)`` is not already listed. + workdir_volume_mount: Optional[dict[str, Any]] = None + # Optional subdirectory appended to ``mountPath`` to form ``code_dir``. + # Defaults to OS username. Set to ``None`` or ``""`` to use``mountPath`` + workdir_subdir: Optional[str] = field(default_factory=lambda: getpass.getuser()) # Optional local directory whose contents are merged into job_dir before # the PVC sync. Use this to include local scripts/files that are not # generated by the packager (e.g. a hand-written training script). @@ -132,12 +138,53 @@ def nnodes(self) -> int: @property def code_dir(self) -> str: - """Subdirectory on the PVC where user code (launch.sh, scripts) is synced. - - Scoped to ``//code`` so multiple users sharing - the same PVC never clobber each other's files. + """Directory inside the trainer pod where user code (launch.sh, scripts) lives. + + Computed as ``mountPath[/workdir_subdir]``. ``workdir_subdir`` defaults to + the OS username so that multiple users sharing a PVC don't collide; set it to + ``None`` or ``""`` when ``subPath`` or ``mountPath`` already provides scoping. + When ``workdir_volume_mount`` is unset, falls back to ``/nemo_run`` for local + planning only (no PVC sync). When ``workdir_volume_mount`` is set, ``mountPath`` + must be present or a ``ValueError`` is raised. """ - return f"{self.workdir_pvc_path.rstrip('/')}/{getpass.getuser()}/code" + root = self._code_mount_root() + if self.workdir_subdir: + return f"{root.rstrip('/')}/{self.workdir_subdir.strip('/')}" + return root + + def _code_mount_root(self) -> str: + wm = self.workdir_volume_mount + if not wm: + return "/nemo_run" + mp = wm.get("mountPath") + if not mp: + raise ValueError( + "workdir_volume_mount must include mountPath " + "(or omit workdir_volume_mount when not using PVC code sync)" + ) + return str(mp) + + def _get_volume_spec_copy_by_name(self, volume_name: str) -> dict[str, Any]: + """Get a copy of a volume spec by name, e.g. the name used in a mount""" + for v in self.volumes: + if v.get("name") == volume_name: + return copy.deepcopy(dict(v)) + raise ValueError( + f"volumes must include an entry named {volume_name!r} referenced by " + "workdir_volume_mount (needed for TrainJob pod and data-mover pod)" + ) + + @staticmethod + def _volume_mount_identity(vm: dict[str, Any]) -> tuple[Any, ...]: + """For checking if a volume mount is already in the list of volume mounts.""" + sp = vm.get("subPath") + return (vm.get("name"), vm.get("mountPath"), sp) + + def _append_volume_mount_if_missing(self, spec: dict[str, Any]) -> None: + want = self._volume_mount_identity(spec) + if any(self._volume_mount_identity(vm) == want for vm in self.volume_mounts): + return + self.volume_mounts.append(dict(spec)) def nproc_per_node(self) -> int: """Return processes per node: nprocs_per_node → gpus_per_node → 1.""" @@ -449,14 +496,14 @@ def _data_mover_pod_name(self, job_name: str) -> str: return f"{job_name}-data-mover" def _start_data_mover_pod(self, pod_name: str, timeout: int = 120) -> None: - """Spin up a throw-away Alpine pod that mounts workdir_pvc and blocks until Running. + """Spin up a throw-away Alpine pod that mounts the work PVC and blocks until Running. Uses ``kubectl cp`` (tar-based, built into Alpine — no internet needed) for data transfer. The pod inherits tolerations, affinity, and imagePullSecrets from the main workload so it can be scheduled on the same nodes (required when the PVC is zone- or node-local). """ - vol_name = "nemo-run-workdir" + volume_spec = self._get_volume_spec_copy_by_name(self.workdir_volume_mount["name"]) pod_spec: dict[str, Any] = { "restartPolicy": "Never", "containers": [ @@ -464,15 +511,10 @@ def _start_data_mover_pod(self, pod_name: str, timeout: int = 120) -> None: "name": "mover", "image": self.data_mover_image, "command": ["sleep", "infinity"], - "volumeMounts": [{"name": vol_name, "mountPath": self.workdir_pvc_path}], - } - ], - "volumes": [ - { - "name": vol_name, - "persistentVolumeClaim": {"claimName": self.workdir_pvc}, + "volumeMounts": [self.workdir_volume_mount], } ], + "volumes": [volume_spec], } if self.tolerations: pod_spec["tolerations"] = self.tolerations @@ -586,13 +628,13 @@ def materialize_launch_script(self, cmd: list[str], max_retries: int = 0) -> Non logger.info("Wrote launch script to %s", launch_script_path) def package(self, packager: Packager, job_name: str) -> None: - """Sync job_dir to the workdir PVC via a temporary data-mover pod before launch. + """Sync job_dir to the PVC declared for ``workdir_volume_mount`` via a data-mover pod. - Does nothing when ``workdir_pvc`` is unset. If ``workdir_local_path`` is set, + Does nothing when ``workdir_volume_mount`` is unset. If ``workdir_local_path`` is set, its contents are first rsynced into ``job_dir`` so hand-written scripts are included alongside generated files such as ``launch.sh``. """ - if not self.workdir_pvc: + if not self.workdir_volume_mount: return # Merge extra local files (e.g. training scripts) into job_dir so they # are included alongside generated files like launch.sh. @@ -608,9 +650,7 @@ def package(self, packager: Packager, job_name: str) -> None: ) logger.info("Merged '%s' into job_dir '%s'", self.workdir_local_path, self.job_dir) - # Sync job_dir to //code on the PVC via a - # throw-away data-mover pod. Scoping to a user subdirectory means we - # never clobber other data already on the shared volume. + # Sync job_dir to ``code_dir`` on the PVC via throw-away data-mover pod. pod_name = self._data_mover_pod_name(job_name) self._start_data_mover_pod(pod_name) try: @@ -618,23 +658,10 @@ def package(self, packager: Packager, job_name: str) -> None: finally: self._delete_data_mover_pod(pod_name) - # Mount the PVC so the training container can reach code_dir. - # If the PVC is already declared (e.g. explicitly by the caller for data), - # reuse that existing volume rather than adding a duplicate entry. - already_mounted = any( - v.get("persistentVolumeClaim", {}).get("claimName") == self.workdir_pvc - for v in self.volumes - ) - if not already_mounted: - vol_name = "nemo-run-workdir" - self.volumes.append( - {"name": vol_name, "persistentVolumeClaim": {"claimName": self.workdir_pvc}} - ) - if not any(vm.get("mountPath") == self.workdir_pvc_path for vm in self.volume_mounts): - self.volume_mounts.append({"name": vol_name, "mountPath": self.workdir_pvc_path}) + self._append_volume_mount_if_missing(self.workdir_volume_mount) def pull_results(self, job_name: str, dest_dir: Optional[str] = None) -> None: - """Sync workdir_pvc_path back to a local directory after the job completes. + """Sync the remote code subtree (``code_dir``) back to a local directory. Args: job_name: The job name used when the job was launched. @@ -643,8 +670,10 @@ def pull_results(self, job_name: str, dest_dir: Optional[str] = None) -> None: persisted job state in ``~/.nemo_run/.kubeflow_jobs.json`` to find the original ``job_dir``. """ - if not self.workdir_pvc: - logger.warning("pull_results called but workdir_pvc is not set — nothing to sync") + if not self.workdir_volume_mount: + logger.warning( + "pull_results called but workdir_volume_mount is not set — nothing to sync" + ) return local_path = dest_dir or getattr(self, "job_dir", "") or "" diff --git a/nemo_run/core/execution/kuberay.py b/nemo_run/core/execution/kuberay.py index 7f4dec2b..697791af 100644 --- a/nemo_run/core/execution/kuberay.py +++ b/nemo_run/core/execution/kuberay.py @@ -15,6 +15,7 @@ # Based on https://github.com/ray-project/kuberay/blob/master/clients/python-client/python_client/utils/kuberay_cluster_utils.py import copy +import getpass import logging import os import re @@ -92,6 +93,13 @@ class KubeRayExecutor(Executor): spec_kwargs: dict[str, Any] = field(default_factory=dict) container_kwargs: dict[str, Any] = field(default_factory=dict) lifecycle_kwargs: dict[str, Any] = field(default_factory=dict) + # Workdir sync: when ``workdir_volume_mount`` is set, syncs local + # directory into ``code_dir`` on the PVC via a throw-away data-mover + # You must declare the matching Volume in ``volumes`` + workdir_volume_mount: Optional[dict[str, Any]] = None + # Optional subdirectory appended to ``mountPath`` to form ``code_dir``. + # Defaults to OS username. Set to ``None`` or ``""`` to use``mountPath`` + workdir_subdir: Optional[str] = field(default_factory=lambda: getpass.getuser()) def __post_init__(self): # Set default image based on ray_version if not provided @@ -103,6 +111,45 @@ def __post_init__(self): worker_group.volumes = copy.deepcopy(self.volumes) worker_group.volume_mounts = copy.deepcopy(self.volume_mounts) + def _code_mount_root(self) -> str: + wm = self.workdir_volume_mount + if not wm: + raise ValueError( + "workdir_volume_mount is not set — cannot compute code_dir. " + "Set workdir_volume_mount on the executor, or omit workdir when starting a job." + ) + mp = wm.get("mountPath") + if not mp: + raise ValueError( + "workdir_volume_mount must include mountPath " + "(or omit workdir_volume_mount when not using PVC code sync)" + ) + return str(mp) + + @property + def code_dir(self) -> str: + """Remote directory where workdir contents are synced on the PVC. + + Computed as ``mountPath[/workdir_subdir]``. ``workdir_subdir`` defaults to + the OS username so that multiple users sharing a PVC don't collide; set it to + ``None`` or ``""`` when ``subPath`` or ``mountPath`` already provides scoping. + Raises ``ValueError`` when ``workdir_volume_mount`` is not configured. + """ + root = self._code_mount_root() + if self.workdir_subdir: + return f"{root.rstrip('/')}/{self.workdir_subdir.strip('/')}" + return root + + def _get_volume_spec_copy_by_name(self, volume_name: str) -> dict[str, Any]: + """Return a deep copy of the volume spec matching *volume_name*.""" + for v in self.volumes: + if v.get("name") == volume_name: + return copy.deepcopy(dict(v)) + raise ValueError( + f"volumes must include an entry named {volume_name!r} referenced by " + "workdir_volume_mount (needed for the data-mover pod)" + ) + def get_cluster_body(self, name: str) -> dict[str, Any]: """ Get the body for the Ray cluster custom resource. diff --git a/nemo_run/run/ray/kuberay.py b/nemo_run/run/ray/kuberay.py index e6efe7e4..5dec1752 100644 --- a/nemo_run/run/ray/kuberay.py +++ b/nemo_run/run/ray/kuberay.py @@ -856,9 +856,10 @@ def status(self, display: bool = True) -> Dict[str, Any]: [vm.get("mountPath", "N/A") for vm in self.executor.volume_mounts] ) - # Construct workdir paths based on standard patterns - # Note: These are estimates based on the naming conventions in the code - user_workspace_base = f"{self.executor.volume_mounts[0]['mountPath']}/{self.user}/code" + if self.executor.workdir_volume_mount: + user_workspace_base = self.executor.code_dir + else: + user_workspace_base = "N/A (no workdir_volume_mount configured)" logger.info( f""" @@ -1029,26 +1030,26 @@ def start( user_workspace_path = None if workdir: - if not executor.volumes or not executor.volume_mounts: + if not executor.workdir_volume_mount: raise ValueError( - "`workdir` specified but executor has no volumes/volume_mounts to mount it." + "`workdir` specified but executor has no `workdir_volume_mount` configured." ) - user_workspace_path = os.path.join( - executor.volume_mounts[0]["mountPath"], self.user, "code", Path(workdir).name - ) - # Add user-based scoping to pod name and workspace path + user_workspace_path = f"{executor.code_dir.rstrip('/')}/{Path(workdir).name}" pod_name = f"{self.job_name}-data-mover" if not dryrun: + volume_spec = executor._get_volume_spec_copy_by_name( + executor.workdir_volume_mount["name"] + ) sync_workdir_via_pod( pod_name=pod_name, namespace=namespace, user_workspace_path=user_workspace_path, workdir=workdir, core_v1_api=self.core_v1_api, - volumes=executor.volumes, - volume_mounts=executor.volume_mounts, + volumes=[volume_spec], + volume_mounts=[executor.workdir_volume_mount], ) logger.info(f"Synced workdir {workdir} to {user_workspace_path}") @@ -1066,12 +1067,11 @@ def start( ray_cluster_spec = ray_cluster_body.get("spec", {}) # Ensure consistent workingDir inside all Ray containers so that relative - # paths in `ray job submit` resolve as expected. + # paths in `ray job submit` resolve as expected. When a workdir was synced + # this is the same path used by sync_workdir_via_pod so files are found. container_workdir = "/workspace" - if workdir: - container_workdir = os.path.join( - executor.volume_mounts[0]["mountPath"], Path(workdir).name - ) + if workdir and executor.workdir_volume_mount: + container_workdir = f"{executor.code_dir.rstrip('/')}/{Path(workdir).name}" def _apply_workdir(pod_template: dict): try: diff --git a/nemo_run/run/torchx_backend/schedulers/kubeflow.py b/nemo_run/run/torchx_backend/schedulers/kubeflow.py index be6e9db8..d2e52485 100644 --- a/nemo_run/run/torchx_backend/schedulers/kubeflow.py +++ b/nemo_run/run/torchx_backend/schedulers/kubeflow.py @@ -95,10 +95,10 @@ def _submit_dryrun( # type: ignore cmd = [role.entrypoint] + role.args - # When workdir_pvc is configured, materialise a launch.sh from the + # When workdir_volume_mount is configured, materialise a launch.sh from the # Jinja2 template (env vars + training command) and point the job at # it so torchrun / launcher details stay out of the manifest. - if executor.workdir_pvc and getattr(executor, "job_dir", None): + if executor.workdir_volume_mount and getattr(executor, "job_dir", None): # Rewrite any local workdir_local_path references in the cmd. if executor.workdir_local_path: local_prefix = executor.workdir_local_path.rstrip(os.sep) diff --git a/test/core/execution/test_kubeflow.py b/test/core/execution/test_kubeflow.py index eb406fa9..3bdabce5 100644 --- a/test/core/execution/test_kubeflow.py +++ b/test/core/execution/test_kubeflow.py @@ -20,6 +20,15 @@ from nemo_run.core.execution.kubeflow import KubeflowExecutor, KubeflowJobState +# PVC workdir sync uses ``workdir_volume_mount`` plus a matching ``volumes`` entry. +_WORKDIR_VOLUME = {"name": "work-vol", "persistentVolumeClaim": {"claimName": "my-pvc"}} +_WORKDIR_SUBPATH = "team-a" +_WORKDIR_MOUNT = { + "name": "work-vol", + "mountPath": "/nemo_run", + "subPath": _WORKDIR_SUBPATH, +} + class TestKubeflowExecutor: @pytest.fixture @@ -387,8 +396,9 @@ def test_status_unknown_when_empty(self, mock_k8s_clients): def workdir_executor(self, mock_k8s_clients, tmp_path): e = KubeflowExecutor( image="test:latest", - workdir_pvc="my-pvc", - workdir_pvc_path="/nemo_run", + volumes=[dict(_WORKDIR_VOLUME)], + workdir_volume_mount=dict(_WORKDIR_MOUNT), + workdir_subdir="testuser", ) e.job_dir = str(tmp_path) return e @@ -398,7 +408,7 @@ def _make_watch_events(self, phase: str): pod.status.phase = phase return [{"object": pod}] - def test_package_noop_without_workdir_pvc(self, mock_k8s_clients, tmp_path): + def test_package_noop_without_workdir_volume_mount(self, mock_k8s_clients, tmp_path): e = KubeflowExecutor(image="test:latest") e.job_dir = str(tmp_path) mock_custom, mock_core = mock_k8s_clients @@ -423,12 +433,19 @@ def test_package_syncs_to_pvc(self, workdir_executor, mock_k8s_clients): mock_core.create_namespaced_pod.assert_called_once() assert mock_check_call.call_count == 2 # mkdir + rsync - # workdir PVC auto-added to volumes/volume_mounts assert any( v.get("persistentVolumeClaim", {}).get("claimName") == "my-pvc" for v in workdir_executor.volumes ) - assert any(vm.get("mountPath") == "/nemo_run" for vm in workdir_executor.volume_mounts) + assert any( + vm.get("name") == "work-vol" + and vm.get("mountPath") == "/nemo_run" + and vm.get("subPath") == _WORKDIR_SUBPATH + for vm in workdir_executor.volume_mounts + ) + pod_body = mock_core.create_namespaced_pod.call_args[1]["body"] + mover_mounts = pod_body["spec"]["containers"][0]["volumeMounts"] + assert mover_mounts == [dict(_WORKDIR_MOUNT)] def test_package_auto_add_volume_idempotent(self, workdir_executor, mock_k8s_clients): """Calling package() twice should not duplicate volumes.""" @@ -454,6 +471,66 @@ def test_package_auto_add_volume_idempotent(self, workdir_executor, mock_k8s_cli ] assert len(pvc_vols) == 1 + def test_code_dir_appends_workdir_subdir(self, mock_k8s_clients): + """``code_dir`` is ``mountPath/workdir_subdir`` when subdir is set.""" + with patch("nemo_run.core.execution.kubeflow.getpass.getuser", return_value="testuser"): + e = KubeflowExecutor( + image="test:latest", + volumes=[dict(_WORKDIR_VOLUME)], + workdir_volume_mount=dict(_WORKDIR_MOUNT), + ) + code_dir = e.code_dir + assert code_dir == "/nemo_run/testuser" + + def test_code_dir_returns_mount_path_when_subdir_is_none(self, mock_k8s_clients): + """``code_dir`` is exactly ``mountPath`` when ``workdir_subdir`` is ``None``.""" + e = KubeflowExecutor( + image="test:latest", + volumes=[dict(_WORKDIR_VOLUME)], + workdir_volume_mount=dict(_WORKDIR_MOUNT), + workdir_subdir=None, + ) + assert e.code_dir == "/nemo_run" + + def test_code_dir_returns_mount_path_when_subdir_is_empty(self, mock_k8s_clients): + """``code_dir`` is exactly ``mountPath`` when ``workdir_subdir`` is ``""``.""" + e = KubeflowExecutor( + image="test:latest", + volumes=[dict(_WORKDIR_VOLUME)], + workdir_volume_mount=dict(_WORKDIR_MOUNT), + workdir_subdir="", + ) + assert e.code_dir == "/nemo_run" + + def test_package_inserts_workdir_mount_when_existing_mountpath_lacks_subpath( + self, mock_k8s_clients, tmp_path + ): + """Volume-mount identity treats missing ``subPath`` differently from a set ``subPath``.""" + _, mock_core = mock_k8s_clients + mock_core.create_namespaced_pod.return_value = MagicMock() + mock_core.delete_namespaced_pod.return_value = MagicMock() + mock_core.read_namespaced_pod.side_effect = ApiException(status=404) + + e = KubeflowExecutor( + image="test:latest", + volumes=[dict(_WORKDIR_VOLUME)], + volume_mounts=[{"name": "work-vol", "mountPath": "/nemo_run"}], + workdir_volume_mount=dict(_WORKDIR_MOUNT), + ) + e.job_dir = str(tmp_path) + + with ( + patch("kubernetes.watch.Watch") as mock_watch_cls, + patch("subprocess.check_call"), + ): + mock_watch_cls.return_value.stream.return_value = self._make_watch_events("Running") + e.package(MagicMock(), "test-job") + + at_run = [vm for vm in e.volume_mounts if vm.get("mountPath") == "/nemo_run"] + assert len(at_run) == 2 + assert any(vm.get("subPath") == _WORKDIR_SUBPATH for vm in at_run) + assert any("subPath" not in vm for vm in at_run) + def test_pull_results_syncs_from_pvc(self, workdir_executor, mock_k8s_clients): _, mock_core = mock_k8s_clients mock_core.create_namespaced_pod.return_value = MagicMock() @@ -478,7 +555,7 @@ def test_pull_results_syncs_from_pvc(self, workdir_executor, mock_k8s_clients): assert "cp" in cp_args assert f"test-job-data-mover:{workdir_executor.code_dir}" in cp_args - def test_pull_results_noop_without_workdir_pvc(self, mock_k8s_clients): + def test_pull_results_noop_without_workdir_volume_mount(self, mock_k8s_clients): e = KubeflowExecutor(image="test:latest") _, mock_core = mock_k8s_clients e.pull_results("test-job") @@ -494,8 +571,8 @@ def test_data_mover_pod_inherits_tolerations_affinity_pull_secrets( e = KubeflowExecutor( image="test:latest", - workdir_pvc="my-pvc", - workdir_pvc_path="/nemo_run", + volumes=[dict(_WORKDIR_VOLUME)], + workdir_volume_mount=dict(_WORKDIR_MOUNT), tolerations=[{"key": "gpu", "operator": "Exists"}], affinity={"nodeAffinity": {"key": "val"}}, image_pull_secrets=["my-secret"], @@ -514,6 +591,7 @@ def test_data_mover_pod_inherits_tolerations_affinity_pull_secrets( assert spec["tolerations"] == [{"key": "gpu", "operator": "Exists"}] assert spec["affinity"] == {"nodeAffinity": {"key": "val"}} assert spec["imagePullSecrets"] == [{"name": "my-secret"}] + assert spec["containers"][0]["volumeMounts"] == [dict(_WORKDIR_MOUNT)] # ── ImportError when kubernetes unavailable ────────────────────────────── @@ -694,7 +772,8 @@ def test_start_data_mover_pod_timeout(self, mock_k8s_clients, tmp_path): e = KubeflowExecutor( image="test:latest", - workdir_pvc="my-pvc", + volumes=[dict(_WORKDIR_VOLUME)], + workdir_volume_mount=dict(_WORKDIR_MOUNT), ) e.job_dir = str(tmp_path) @@ -715,7 +794,11 @@ def test_delete_data_mover_pod_non_404_logs_warning(self, mock_k8s_clients, tmp_ _, mock_core = mock_k8s_clients mock_core.delete_namespaced_pod.side_effect = ApiException(status=500) - e = KubeflowExecutor(image="test:latest", workdir_pvc="my-pvc") + e = KubeflowExecutor( + image="test:latest", + volumes=[dict(_WORKDIR_VOLUME)], + workdir_volume_mount=dict(_WORKDIR_MOUNT), + ) e.job_dir = str(tmp_path) # Should not raise; just log a warning and return @@ -728,7 +811,11 @@ def test_delete_data_mover_pod_timeout_warning(self, mock_k8s_clients, tmp_path) # Pod never disappears (read always succeeds) mock_core.read_namespaced_pod.return_value = MagicMock() - e = KubeflowExecutor(image="test:latest", workdir_pvc="my-pvc") + e = KubeflowExecutor( + image="test:latest", + volumes=[dict(_WORKDIR_VOLUME)], + workdir_volume_mount=dict(_WORKDIR_MOUNT), + ) e.job_dir = str(tmp_path) with patch("time.sleep"): @@ -742,7 +829,8 @@ def test_materialize_launch_script_writes_file(self, mock_k8s_clients, tmp_path) e = KubeflowExecutor( image="test:latest", env_vars={"MY_VAR": "hello"}, - workdir_pvc="my-pvc", + volumes=[dict(_WORKDIR_VOLUME)], + workdir_volume_mount=dict(_WORKDIR_MOUNT), ) e.job_dir = str(tmp_path) @@ -766,7 +854,8 @@ def test_package_with_workdir_local_path(self, mock_k8s_clients, tmp_path): local_path = str(tmp_path / "local_scripts") e = KubeflowExecutor( image="test:latest", - workdir_pvc="my-pvc", + volumes=[dict(_WORKDIR_VOLUME)], + workdir_volume_mount=dict(_WORKDIR_MOUNT), workdir_local_path=local_path, ) e.job_dir = str(tmp_path / "job_dir") @@ -793,8 +882,12 @@ def test_package_pvc_already_mounted_no_duplicate_volume(self, mock_k8s_clients, e = KubeflowExecutor( image="test:latest", - workdir_pvc="my-pvc", volumes=[{"name": "pre-vol", "persistentVolumeClaim": {"claimName": "my-pvc"}}], + workdir_volume_mount={ + "name": "pre-vol", + "mountPath": "/nemo_run", + "subPath": "tenant-a", + }, ) e.job_dir = str(tmp_path) @@ -813,7 +906,11 @@ def test_package_pvc_already_mounted_no_duplicate_volume(self, mock_k8s_clients, # ── pull_results: no job_dir set and _lookup_job_dir returns empty ──────── def test_pull_results_raises_when_no_job_dir_resolvable(self, mock_k8s_clients): - e = KubeflowExecutor(image="test:latest", workdir_pvc="my-pvc") + e = KubeflowExecutor( + image="test:latest", + volumes=[dict(_WORKDIR_VOLUME)], + workdir_volume_mount=dict(_WORKDIR_MOUNT), + ) # job_dir not set with patch.object(e, "_lookup_job_dir", return_value=""): @@ -826,7 +923,11 @@ def test_pull_results_uses_dest_dir_when_no_job_dir(self, mock_k8s_clients, tmp_ mock_core.delete_namespaced_pod.return_value = MagicMock() mock_core.read_namespaced_pod.side_effect = ApiException(status=404) - e = KubeflowExecutor(image="test:latest", workdir_pvc="my-pvc") + e = KubeflowExecutor( + image="test:latest", + volumes=[dict(_WORKDIR_VOLUME)], + workdir_volume_mount=dict(_WORKDIR_MOUNT), + ) # job_dir not set with ( @@ -841,13 +942,21 @@ def test_pull_results_uses_dest_dir_when_no_job_dir(self, mock_k8s_clients, tmp_ # ── _lookup_job_dir ─────────────────────────────────────────────────────── def test_lookup_job_dir_returns_empty_when_no_jobs_file(self, mock_k8s_clients, tmp_path): - e = KubeflowExecutor(image="test:latest", workdir_pvc="my-pvc") + e = KubeflowExecutor( + image="test:latest", + volumes=[dict(_WORKDIR_VOLUME)], + workdir_volume_mount=dict(_WORKDIR_MOUNT), + ) with patch("nemo_run.config.get_nemorun_home", return_value=str(tmp_path)): result = e._lookup_job_dir("nonexistent-job") assert result == "" def test_lookup_job_dir_returns_empty_on_exception(self, mock_k8s_clients): - e = KubeflowExecutor(image="test:latest", workdir_pvc="my-pvc") + e = KubeflowExecutor( + image="test:latest", + volumes=[dict(_WORKDIR_VOLUME)], + workdir_volume_mount=dict(_WORKDIR_MOUNT), + ) with patch("nemo_run.config.get_nemorun_home", side_effect=Exception("boom")): result = e._lookup_job_dir("test-job") assert result == "" diff --git a/test/run/ray/test_kuberay.py b/test/run/ray/test_kuberay.py index 8ad66d6e..1ad14713 100644 --- a/test/run/ray/test_kuberay.py +++ b/test/run/ray/test_kuberay.py @@ -409,13 +409,15 @@ def mock_k8s_clients(self): @pytest.fixture def basic_executor(self): - """Create a basic KubeRayExecutor.""" + """Create a basic KubeRayExecutor with workdir_volume_mount configured.""" return KubeRayExecutor( namespace="test-namespace", volumes=[ {"name": "workspace", "persistentVolumeClaim": {"claimName": "workspace-pvc"}} ], volume_mounts=[{"name": "workspace", "mountPath": "/workspace"}], + workdir_volume_mount={"name": "workspace", "mountPath": "/workspace"}, + workdir_subdir="testuser", ) @pytest.fixture @@ -1147,6 +1149,167 @@ def test_kuberay_worker_group_post_init_with_custom_replicas(self): assert worker_group.max_replicas == 5 +class TestKubeRayExecutorWorkdir: + """Unit tests for KubeRayExecutor workdir fields and code_dir property.""" + + _WORKDIR_VOLUME = { + "name": "work-vol", + "persistentVolumeClaim": {"claimName": "my-pvc"}, + } + _WORKDIR_MOUNT = {"name": "work-vol", "mountPath": "/workspace", "subPath": "team-a"} + + def test_code_dir_appends_workdir_subdir(self): + """code_dir is mountPath/workdir_subdir when subdir is set.""" + with patch("nemo_run.core.execution.kuberay.getpass.getuser", return_value="testuser"): + e = KubeRayExecutor( + volumes=[dict(self._WORKDIR_VOLUME)], + workdir_volume_mount=dict(self._WORKDIR_MOUNT), + ) + assert e.code_dir == "/workspace/testuser" + + def test_code_dir_returns_mount_path_when_subdir_is_none(self): + """code_dir is exactly mountPath when workdir_subdir is None.""" + e = KubeRayExecutor( + volumes=[dict(self._WORKDIR_VOLUME)], + workdir_volume_mount=dict(self._WORKDIR_MOUNT), + workdir_subdir=None, + ) + assert e.code_dir == "/workspace" + + def test_code_dir_returns_mount_path_when_subdir_is_empty(self): + """code_dir is exactly mountPath when workdir_subdir is ''.""" + e = KubeRayExecutor( + volumes=[dict(self._WORKDIR_VOLUME)], + workdir_volume_mount=dict(self._WORKDIR_MOUNT), + workdir_subdir="", + ) + assert e.code_dir == "/workspace" + + def test_code_dir_raises_without_workdir_volume_mount(self): + """code_dir raises ValueError when workdir_volume_mount is not configured.""" + e = KubeRayExecutor() + with pytest.raises(ValueError, match="workdir_volume_mount is not set"): + _ = e.code_dir + + def test_get_volume_spec_copy_by_name_success(self): + """_get_volume_spec_copy_by_name returns a deep copy of the named volume.""" + e = KubeRayExecutor(volumes=[dict(self._WORKDIR_VOLUME)]) + spec = e._get_volume_spec_copy_by_name("work-vol") + assert spec == self._WORKDIR_VOLUME + # Mutating the copy must not affect the original + spec["extra"] = "modified" + assert "extra" not in e.volumes[0] + + def test_get_volume_spec_copy_by_name_missing_raises(self): + """_get_volume_spec_copy_by_name raises ValueError for unknown volume name.""" + e = KubeRayExecutor(volumes=[dict(self._WORKDIR_VOLUME)]) + with pytest.raises(ValueError, match="nonexistent"): + e._get_volume_spec_copy_by_name("nonexistent") + + +class TestKubeRayJobWorkdirPaths: + """Tests that KubeRayJob.start() computes sync and container paths correctly.""" + + @pytest.fixture + def mock_k8s_clients(self): + with patch("nemo_run.run.ray.kuberay.config.load_kube_config"): + with patch("nemo_run.run.ray.kuberay.client.CustomObjectsApi") as mock_api: + with patch("nemo_run.run.ray.kuberay.client.CoreV1Api") as mock_core_api: + yield mock_api.return_value, mock_core_api.return_value + + @pytest.fixture + def workdir_executor(self): + return KubeRayExecutor( + namespace="test-namespace", + volumes=[ + {"name": "workspace", "persistentVolumeClaim": {"claimName": "workspace-pvc"}} + ], + volume_mounts=[{"name": "workspace", "mountPath": "/workspace"}], + workdir_volume_mount={"name": "workspace", "mountPath": "/workspace"}, + workdir_subdir="testuser", + ) + + @pytest.fixture + def job(self, workdir_executor, mock_k8s_clients): + with patch("nemo_run.run.ray.kuberay.get_user", return_value="testuser"): + return KubeRayJob(name="test-job", executor=workdir_executor) + + def _run_start_with_workdir(self, job, mock_core_api, workdir="/local/src"): + mock_core_api.create_namespaced_pod.return_value = None + with patch("nemo_run.core.execution.kuberay.watch.Watch") as mock_watch_cls: + mock_watch_cls.return_value.stream.return_value = [ + {"object": Mock(status=Mock(phase="Running"))} + ] + with patch("nemo_run.core.execution.kuberay.subprocess.check_call"): + with patch("nemo_run.run.ray.kuberay.client.CustomObjectsApi"): + job.start(command="python train.py", workdir=workdir) + + def test_data_mover_mounts_only_workdir_volume(self, job, mock_k8s_clients): + """Data-mover pod is created with only the workdir volume/mount, not all volumes.""" + _, mock_core_api = mock_k8s_clients + self._run_start_with_workdir(job, mock_core_api) + + pod_body = mock_core_api.create_namespaced_pod.call_args[1]["body"] + mover_volumes = pod_body.spec.volumes + mover_mounts = pod_body.spec.containers[0].volume_mounts + assert len(mover_volumes) == 1 + assert len(mover_mounts) == 1 + + def test_sync_destination_is_code_dir_plus_workdir_name(self, job, mock_k8s_clients): + """user_workspace_path passed to sync is executor.code_dir + workdir basename.""" + _, mock_core_api = mock_k8s_clients + mock_core_api.create_namespaced_pod.return_value = None + + with patch("nemo_run.core.execution.kuberay.watch.Watch") as mock_watch_cls: + mock_watch_cls.return_value.stream.return_value = [ + {"object": Mock(status=Mock(phase="Running"))} + ] + with patch("nemo_run.core.execution.kuberay.subprocess.check_call") as mock_check: + job.start(command="python train.py", workdir="/local/src") + + # First check_call is mkdir -p + mkdir_args = mock_check.call_args_list[0][0][0] + assert mkdir_args[-1] == "/workspace/testuser/src" + + def test_container_workdir_matches_sync_path(self, job, mock_k8s_clients): + """workingDir set on containers equals the sync destination path.""" + mock_api, mock_core_api = mock_k8s_clients + self._run_start_with_workdir(job, mock_core_api) + + body = mock_api.create_namespaced_custom_object.call_args.kwargs["body"] + head_containers = body["spec"]["rayClusterSpec"]["headGroupSpec"]["template"]["spec"][ + "containers" + ] + expected_workdir = "/workspace/testuser/src" + assert all(c.get("workingDir") == expected_workdir for c in head_containers) + + def test_workdir_subdir_none_uses_mountpath_directly(self, mock_k8s_clients): + """When workdir_subdir is None the sync path is mountPath/workdir_name (no user prefix).""" + _, mock_core_api = mock_k8s_clients + executor = KubeRayExecutor( + namespace="test-namespace", + volumes=[ + {"name": "workspace", "persistentVolumeClaim": {"claimName": "workspace-pvc"}} + ], + volume_mounts=[{"name": "workspace", "mountPath": "/workspace"}], + workdir_volume_mount={"name": "workspace", "mountPath": "/workspace"}, + workdir_subdir=None, + ) + with patch("nemo_run.run.ray.kuberay.get_user", return_value="testuser"): + job = KubeRayJob(name="test-job", executor=executor) + + mock_core_api.create_namespaced_pod.return_value = None + with patch("nemo_run.core.execution.kuberay.watch.Watch") as mock_watch_cls: + mock_watch_cls.return_value.stream.return_value = [ + {"object": Mock(status=Mock(phase="Running"))} + ] + with patch("nemo_run.core.execution.kuberay.subprocess.check_call") as mock_check: + job.start(command="python train.py", workdir="/local/src") + + mkdir_args = mock_check.call_args_list[0][0][0] + assert mkdir_args[-1] == "/workspace/src" + + class TestSyncWorkdirViaPod: """Test sync_workdir_via_pod function and related error paths.""" @@ -1328,13 +1491,15 @@ def mock_k8s_clients(self): @pytest.fixture def basic_executor(self): - """Create a basic KubeRayExecutor.""" + """Create a basic KubeRayExecutor with workdir_volume_mount configured.""" return KubeRayExecutor( namespace="test-namespace", volumes=[ {"name": "workspace", "persistentVolumeClaim": {"claimName": "workspace-pvc"}} ], volume_mounts=[{"name": "workspace", "mountPath": "/workspace"}], + workdir_volume_mount={"name": "workspace", "mountPath": "/workspace"}, + workdir_subdir="testuser", ) @pytest.fixture @@ -1387,8 +1552,8 @@ def test_start_job_api_error(self, job_with_basic_executor, mock_k8s_clients): with pytest.raises(RuntimeError, match="Error creating RayJob"): job_with_basic_executor.start(command="python train.py") - def test_start_job_workdir_without_volumes(self): - """Test starting job with workdir but no volumes.""" + def test_start_job_workdir_without_workdir_volume_mount(self): + """Test starting job with workdir but no workdir_volume_mount configured.""" executor = KubeRayExecutor(namespace="test") with patch("nemo_run.run.ray.kuberay.get_user", return_value="testuser"): @@ -1398,7 +1563,8 @@ def test_start_job_workdir_without_volumes(self): job = KubeRayJob(name="test-job", executor=executor) with pytest.raises( - ValueError, match="workdir.*specified but executor has no volumes" + ValueError, + match="workdir.*specified but executor has no.*workdir_volume_mount", ): job.start(command="python train.py", workdir="/local/path") @@ -1885,13 +2051,15 @@ def mock_k8s_clients(self): @pytest.fixture def basic_executor(self): - """Create a basic KubeRayExecutor.""" + """Create a basic KubeRayExecutor with workdir_volume_mount for display banner.""" return KubeRayExecutor( namespace="test-namespace", volumes=[ {"name": "workspace", "persistentVolumeClaim": {"claimName": "workspace-pvc"}} ], volume_mounts=[{"name": "workspace", "mountPath": "/workspace"}], + workdir_volume_mount={"name": "workspace", "mountPath": "/workspace"}, + workdir_subdir="testuser", ) @pytest.fixture diff --git a/test/run/torchx_backend/schedulers/test_kubeflow.py b/test/run/torchx_backend/schedulers/test_kubeflow.py index c8398dfa..56636b03 100644 --- a/test/run/torchx_backend/schedulers/test_kubeflow.py +++ b/test/run/torchx_backend/schedulers/test_kubeflow.py @@ -309,14 +309,21 @@ def test_submit_dryrun_applies_macro_values(scheduler, mock_app_def, executor): assert dryrun_info.request.cmd[0] == "python" -# ── _submit_dryrun: workdir_pvc with workdir_local_path cmd rewriting ───────── +# ── _submit_dryrun: workdir_volume_mount with workdir_local_path cmd rewriting ─ -def test_submit_dryrun_with_workdir_pvc_and_local_path(scheduler, mock_app_def, mock_k8s, tmp_path): +def test_submit_dryrun_with_workdir_volume_mount_and_local_path( + scheduler, mock_app_def, mock_k8s, tmp_path +): local_path = str(tmp_path / "scripts") e = KubeflowExecutor( image="nvcr.io/nvidia/nemo:26.02", - workdir_pvc="my-pvc", + volumes=[{"name": "w", "persistentVolumeClaim": {"claimName": "my-pvc"}}], + workdir_volume_mount={ + "name": "w", + "mountPath": "/nemo_run", + "subPath": "team-a", + }, workdir_local_path=local_path, ) e.experiment_id = "test_exp" @@ -344,10 +351,17 @@ def test_submit_dryrun_with_workdir_pvc_and_local_path(scheduler, mock_app_def, assert dryrun_info.request.cmd == ["/bin/bash", f"{e.code_dir}/launch.sh"] -def test_submit_dryrun_with_workdir_pvc_no_local_path(scheduler, mock_app_def, mock_k8s, tmp_path): +def test_submit_dryrun_with_workdir_volume_mount_no_local_path( + scheduler, mock_app_def, mock_k8s, tmp_path +): e = KubeflowExecutor( image="nvcr.io/nvidia/nemo:26.02", - workdir_pvc="my-pvc", + volumes=[{"name": "w", "persistentVolumeClaim": {"claimName": "my-pvc"}}], + workdir_volume_mount={ + "name": "w", + "mountPath": "/nemo_run", + "subPath": "team-a", + }, ) e.experiment_id = "test_exp" e.job_dir = str(tmp_path)