|
2 | 2 |
|
3 | 3 | import json |
4 | 4 | import os |
| 5 | +import tarfile |
| 6 | +from io import BytesIO |
5 | 7 |
|
| 8 | +import pytest |
6 | 9 | import yaml |
7 | 10 |
|
8 | 11 |
|
@@ -269,6 +272,82 @@ def test_extension_git_explicit_opt_in_works_with_no_git(self, tmp_path): |
269 | 272 | assert (project / ".specify" / "extensions" / "git").exists() |
270 | 273 | assert not (project / ".git").exists() |
271 | 274 |
|
| 275 | + def test_tar_extension_archive_rejects_special_members(self, tmp_path): |
| 276 | + """TAR extension archives reject non-file and non-directory members.""" |
| 277 | + from specify_cli import _install_extension_archive |
| 278 | + from specify_cli.extensions import ValidationError |
| 279 | + |
| 280 | + archive_path = tmp_path / "unsafe-extension.tar" |
| 281 | + manifest = b"extension:\n id: test-ext\n name: Test\n version: 1.0.0\n" |
| 282 | + |
| 283 | + with tarfile.open(archive_path, "w") as tf: |
| 284 | + manifest_info = tarfile.TarInfo("extension.yml") |
| 285 | + manifest_info.size = len(manifest) |
| 286 | + tf.addfile(manifest_info, BytesIO(manifest)) |
| 287 | + |
| 288 | + fifo_info = tarfile.TarInfo("unsafe-fifo") |
| 289 | + fifo_info.type = tarfile.FIFOTYPE |
| 290 | + tf.addfile(fifo_info) |
| 291 | + |
| 292 | + with pytest.raises(ValidationError, match="Unsupported TAR member type"): |
| 293 | + _install_extension_archive(object(), archive_path, "0.0.0") |
| 294 | + |
| 295 | + def test_extension_url_downloads_in_bounded_chunks(self, tmp_path, monkeypatch): |
| 296 | + """URL extension downloads stream to disk instead of reading all bytes.""" |
| 297 | + import urllib.request |
| 298 | + import specify_cli |
| 299 | + |
| 300 | + payload = b"archive-bytes" |
| 301 | + read_sizes = [] |
| 302 | + |
| 303 | + class FakeResponse: |
| 304 | + headers = {"Content-Length": str(len(payload))} |
| 305 | + |
| 306 | + def __init__(self): |
| 307 | + self.offset = 0 |
| 308 | + |
| 309 | + def __enter__(self): |
| 310 | + return self |
| 311 | + |
| 312 | + def __exit__(self, exc_type, exc, tb): |
| 313 | + return False |
| 314 | + |
| 315 | + def read(self, size=-1): |
| 316 | + read_sizes.append(size) |
| 317 | + if self.offset >= len(payload): |
| 318 | + return b"" |
| 319 | + end = min(self.offset + size, len(payload)) |
| 320 | + chunk = payload[self.offset:end] |
| 321 | + self.offset = end |
| 322 | + return chunk |
| 323 | + |
| 324 | + def fake_urlopen(url, timeout): |
| 325 | + assert url == "https://example.com/extension.zip" |
| 326 | + assert timeout == 60 |
| 327 | + return FakeResponse() |
| 328 | + |
| 329 | + def fake_install(manager, archive_path, speckit_version, priority=10): |
| 330 | + assert archive_path.read_bytes() == payload |
| 331 | + assert speckit_version == "0.0.0" |
| 332 | + assert priority == 10 |
| 333 | + return "installed" |
| 334 | + |
| 335 | + monkeypatch.setattr(urllib.request, "urlopen", fake_urlopen) |
| 336 | + monkeypatch.setattr(specify_cli, "_install_extension_archive", fake_install) |
| 337 | + |
| 338 | + result = specify_cli._download_and_install_extension_url( |
| 339 | + object(), |
| 340 | + tmp_path, |
| 341 | + "https://example.com/extension.zip", |
| 342 | + "0.0.0", |
| 343 | + ) |
| 344 | + |
| 345 | + assert result == "installed" |
| 346 | + assert read_sizes == [ |
| 347 | + specify_cli.DOWNLOAD_CHUNK_BYTES, |
| 348 | + specify_cli.DOWNLOAD_CHUNK_BYTES, |
| 349 | + ] |
| 350 | + |
272 | 351 |
|
273 | 352 | class TestForceExistingDirectory: |
274 | 353 | """Tests for --force merging into an existing named directory.""" |
|
0 commit comments