fix: add NCCL-safe utilities and GRPO backend failure isolation#233
fix: add NCCL-safe utilities and GRPO backend failure isolation#233Yunnglin wants to merge 12 commits into
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces NCCL-safe fault tolerance utilities (safe_loss and @nccl_safe) to prevent distributed training hangs by catching errors gracefully and forcing zero-gradient backward passes when necessary. It also propagates the TWINKLE_FAIL_FAST environment variable, updates server configurations, and adds extensive E2E and unit tests. However, several critical issues were identified in the new nccl_safe.py implementation: first, safe_loss returns a function wrapper instead of a Loss instance, which will fail pipeline assertions expecting a Loss subclass; second, the fallback path in _force_zero_backward can raise an AttributeError if model.model is a list, and detaching the parameter breaks the autograd graph connectivity required for FSDP hooks to fire.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
- Add ping() method to all model backends (mock/transformers/megatron) - Add check_model_health() to ModelManagement that calls ping() - Add /healthz endpoint on model deployment (returns 503 if actors dead) - Add /twinkle/healthz/deep on gateway that probes all model deployments - Update entrypoint.sh watchdog to check deep health URL - Update validation middleware to skip sticky session for /healthz/* paths - Fix start_e2e_server.py DEFAULT_CONFIG path
…nly for PP>1 When variable_seq_lengths=True with Pipeline Parallelism, logps from different DP ranks / microbatches have different seq_lens. After HTTP serialization + collect_tensor_dict, they become ragged nested lists that torch.as_tensor cannot handle. Changes: - megatron_model.py: Flatten and pad_and_stack ragged ref_outputs logps in forward_backward before passing to MegatronModel (twinkle client) - common.py: Handle ragged list[list] in _tensor_output_to_rows by flattening microbatch nesting and pad_and_stack (tinker client) - nccl_safe.py: Add traceback to safe_loss and nccl_safe_megatron error logging for better diagnostics - Add tinker client DPO PP E2E test
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 37 out of 37 changed files in this pull request and generated 8 comments.
Comments suppressed due to low confidence (1)
tests/server/integration/test_full_cycle_e2e.py:63
- SAVE_DIR is hard-coded to a
/mnt/nas2/...path, which makes this E2E test fail outside that specific environment. Use an env var override and default to a writable temp directory.
| try: | ||
| params = [p for p in _iter_model_params(model) if p.requires_grad] | ||
| if params: | ||
| param = params[0] | ||
| zero_loss = (param.flatten()[0] * 0).sum() | ||
| else: | ||
| zero_loss = torch.zeros((), device='cuda', requires_grad=True) | ||
| except Exception: | ||
| zero_loss = torch.zeros((), device='cuda', requires_grad=True) |
| @remote_function(dispatch='slice_dp', collect=collect_tensor_dict) | ||
| @nccl_safe_megatron(forward_only=True) | ||
| def forward_only(self, *, inputs: InputFeature | list[InputFeature] | Trajectory | list[Trajectory], **kwargs): |
| @remote_function(dispatch='slice_dp', collect=collect_tensor_dict) | ||
| @nccl_safe_megatron | ||
| def forward_backward(self, *, inputs: InputFeature | list[InputFeature] | Trajectory | list[Trajectory], **kwargs): |
| # Flatten [[mb0_sample0, mb0_sample1], [mb1_sample0, ...]] → [sample0, sample1, ...] | ||
| flat = [s for item in logps for s in (item if isinstance(item[0], (list, tuple)) else [item])] |
| if isinstance(value, list) and value and isinstance(value[0], (list, tuple)): | ||
| flat = [s for item in value for s in (item if isinstance(item[0], (list, tuple)) else [item])] |
| RAY = "/mnt/nas2/anaconda3/envs/tinker_myl/bin/ray" | ||
| PYTHON = "/mnt/nas2/anaconda3/envs/tinker_myl/bin/python" | ||
| WORKDIR = "/mnt/nas2/yunlin.myl/twinkle" | ||
| DEFAULT_CONFIG = "tests/server/config/server_config_4b_e2e.yaml" | ||
| RAY_TEMP_DIR = "/mnt/nas2/yunlin.myl/ray_logs" | ||
| SERVER_LOG = os.path.join(WORKDIR, "server_e2e.log") |
| subprocess.Popen( | ||
| cmd, shell=True, cwd=WORKDIR, stdout=log_fd, stderr=log_fd, | ||
| env={**os.environ, "TWINKLE_TRUST_REMOTE_CODE": "1"}, | ||
| start_new_session=True, | ||
| ) |
| runtime_env: | ||
| env_vars: | ||
| TWINKLE_TRUST_REMOTE_CODE: "0" | ||
| TWINKLE_TRUST_REMOTE_CODE: "1" | ||
| TWINKLE_FAIL_FAST: "0" |
This PR adds NCCL-safe utilities to prevent distributed training hangs when GRPO backend errors occur after forward but before backward completes.