88import tempfile
99import urllib .request
1010import urllib .error
11+ import urllib .parse
1112from pathlib import Path
1213
1314REPO = "DeusData/codebase-memory-mcp"
1415
16+ # Security: only permit https fetches. urllib's default handlers accept
17+ # file://, ftp://, and custom schemes — a redirect or tainted URL source
18+ # could otherwise turn a download into an arbitrary-local-file read.
19+ _ALLOWED_SCHEMES = frozenset ({"https" })
20+
21+
22+ def _validate_url_scheme (url : str ) -> None :
23+ """Reject non-https URLs before any network fetch."""
24+ scheme = urllib .parse .urlparse (url ).scheme
25+ if scheme not in _ALLOWED_SCHEMES :
26+ sys .exit (
27+ f"codebase-memory-mcp: refusing to fetch non-https URL "
28+ f"(scheme={ scheme !r} ): { url } "
29+ )
30+
31+
32+ def _safe_extract_tar (tf , dest : str ) -> None :
33+ """Extract a tarfile to dest, rejecting path-traversal entries.
34+
35+ Uses the tarfile data filter on Python >=3.12 (PEP 706), falls back to
36+ manual per-member path validation on older Pythons. Mitigates the
37+ classic tar-slip / Zip Slip vulnerability (CWE-22).
38+ """
39+ # Python 3.12+: use the built-in 'data' filter which rejects absolute
40+ # paths, '..' components, symlinks pointing outside dest, etc.
41+ if hasattr (tf , "extraction_filter" ) or sys .version_info >= (3 , 12 ):
42+ tf .extractall (dest , filter = "data" )
43+ return
44+
45+ # Fallback for Python <3.12: validate each member before extracting.
46+ dest_abs = os .path .abspath (dest )
47+ for member in tf .getmembers ():
48+ # Reject symlinks and hardlinks outright (they can escape dest).
49+ if member .issym () or member .islnk ():
50+ sys .exit (
51+ f"codebase-memory-mcp: refusing unsafe tar entry "
52+ f"(link: { member .name !r} )"
53+ )
54+ member_abs = os .path .abspath (os .path .join (dest_abs , member .name ))
55+ if not (member_abs == dest_abs or member_abs .startswith (dest_abs + os .sep )):
56+ sys .exit (
57+ f"codebase-memory-mcp: refusing unsafe tar entry "
58+ f"(escapes dest: { member .name !r} )"
59+ )
60+ tf .extractall (dest )
61+
62+
63+ def _safe_extract_zip (zf , dest : str ) -> None :
64+ """Extract a zipfile to dest, rejecting path-traversal entries.
65+
66+ zipfile.ZipFile has no built-in extraction filter; mirrors the tar
67+ fallback logic — validate each member before extracting.
68+ """
69+ dest_abs = os .path .abspath (dest )
70+ for name in zf .namelist ():
71+ member_abs = os .path .abspath (os .path .join (dest_abs , name ))
72+ if not (member_abs == dest_abs or member_abs .startswith (dest_abs + os .sep )):
73+ sys .exit (
74+ f"codebase-memory-mcp: refusing unsafe zip entry "
75+ f"(escapes dest: { name !r} )"
76+ )
77+ zf .extractall (dest )
78+
1579
1680def _version () -> str :
1781 try :
@@ -64,6 +128,7 @@ def _download(version: str) -> Path:
64128 f"https://github.com/{ REPO } /releases/download/v{ version } "
65129 f"/codebase-memory-mcp-{ os_name } -{ arch } .{ ext } "
66130 )
131+ _validate_url_scheme (url )
67132
68133 dest = _bin_path (version )
69134 dest .parent .mkdir (parents = True , exist_ok = True )
@@ -76,7 +141,7 @@ def _download(version: str) -> Path:
76141 with tempfile .TemporaryDirectory () as tmp :
77142 tmp_archive = os .path .join (tmp , f"cbm.{ ext } " )
78143 try :
79- urllib .request .urlretrieve (url , tmp_archive )
144+ urllib .request .urlretrieve (url , tmp_archive ) # noqa: S310 — scheme validated above
80145 except urllib .error .HTTPError as e :
81146 sys .exit (
82147 f"codebase-memory-mcp: download failed ({ e } )\n "
@@ -87,11 +152,11 @@ def _download(version: str) -> Path:
87152 if ext == "tar.gz" :
88153 import tarfile
89154 with tarfile .open (tmp_archive ) as tf :
90- tf . extractall ( tmp )
155+ _safe_extract_tar ( tf , tmp )
91156 else :
92157 import zipfile
93158 with zipfile .ZipFile (tmp_archive ) as zf :
94- zf . extractall ( tmp )
159+ _safe_extract_zip ( zf , tmp )
95160
96161 bin_name = "codebase-memory-mcp.exe" if os_name == "windows" else "codebase-memory-mcp"
97162 extracted = os .path .join (tmp , bin_name )
@@ -112,11 +177,15 @@ def main() -> None:
112177 if not bin_path .exists ():
113178 bin_path = _download (version )
114179
180+ # args is a list (not a shell string), so exec/subprocess treat each
181+ # element as a discrete argv entry — no shell interpretation, no
182+ # injection vector. sys.argv forwarding is the whole point of this
183+ # shim, so tainted-input suppression is intentional.
115184 args = [str (bin_path )] + sys .argv [1 :]
116185
117186 if sys .platform != "win32" :
118- os .execv (str (bin_path ), args )
187+ os .execv (str (bin_path ), args ) # noqa: S606 — list form, no shell
119188 else :
120189 import subprocess
121- result = subprocess .run (args )
190+ result = subprocess .run (args ) # noqa: S603 — list form, no shell=True
122191 sys .exit (result .returncode )
0 commit comments