Skip to content

Commit 948e605

Browse files
committed
Task 3: Create LocalRepoTools for local filesystem access
Agent-Id: agent-4f5819dd-c4c4-4d91-944c-fc754aab9ec5 Linked-Note-Id: 12de8f2f-c265-4f1e-afba-11c93caa7e4b
1 parent 0893cff commit 948e605

1 file changed

Lines changed: 206 additions & 0 deletions

File tree

npx/python/cli/local_repo_tools.py

Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
"""Local filesystem version of RepoTools for agentic code review.
2+
3+
Provides the same interface as RepoTools but reads from local filesystem
4+
instead of GitHub API. Enables RLM to explore local files during review.
5+
"""
6+
7+
import asyncio
8+
import os
9+
import subprocess
10+
from typing import Any
11+
12+
from .repo_tools import MAX_FILE_BYTES, sanitize_path, find_line_range
13+
14+
# Common ignore patterns for directory listing
15+
IGNORE_PATTERNS = {
16+
"node_modules",
17+
"__pycache__",
18+
".git",
19+
".venv",
20+
"venv",
21+
"dist",
22+
"build",
23+
".pytest_cache",
24+
".mypy_cache",
25+
"*.egg-info",
26+
}
27+
28+
# File extensions to search
29+
SEARCH_EXTENSIONS = (
30+
".py", ".js", ".ts", ".tsx", ".jsx",
31+
".go", ".rs", ".java", ".md", ".json",
32+
".yaml", ".yml", ".sh", ".bash"
33+
)
34+
35+
36+
class LocalRepoTools:
37+
"""Tools for exploring a local repository."""
38+
39+
def __init__(self, root_path: str):
40+
"""Initialize with local directory root path.
41+
42+
Args:
43+
root_path: Absolute or relative path to repository root
44+
"""
45+
self.root_path = os.path.realpath(root_path)
46+
if not os.path.isdir(self.root_path):
47+
raise ValueError(f"root_path is not a directory: {self.root_path}")
48+
49+
def _resolve_path(self, path: str) -> str | None:
50+
"""Resolve and validate a path relative to root_path.
51+
52+
Returns absolute path if valid, None if invalid or outside root.
53+
"""
54+
clean = sanitize_path(path) if path else ""
55+
if clean is None:
56+
return None
57+
58+
# Build absolute path
59+
abs_path = os.path.realpath(os.path.join(self.root_path, clean))
60+
61+
# Security: ensure resolved path is within root_path
62+
if not abs_path.startswith(self.root_path + os.sep) and abs_path != self.root_path:
63+
return None
64+
65+
return abs_path
66+
67+
async def fetch_file(self, path: str) -> str:
68+
"""Fetch a file from the local filesystem.
69+
70+
Returns file content or error/skip stub.
71+
"""
72+
abs_path = self._resolve_path(path)
73+
if abs_path is None:
74+
return "[ERROR: invalid path]"
75+
76+
if not os.path.exists(abs_path):
77+
return "[ERROR: 404 - not found]"
78+
79+
if not os.path.isfile(abs_path):
80+
return "[SKIPPED: path is a directory, use list_directory]"
81+
82+
# Check size
83+
try:
84+
size = os.path.getsize(abs_path)
85+
except OSError:
86+
return "[ERROR: cannot read file]"
87+
88+
if size > MAX_FILE_BYTES:
89+
return f"[SKIPPED: file exceeds {MAX_FILE_BYTES // 1000}KB limit ({size // 1000}KB)]"
90+
91+
# Try to read as text
92+
try:
93+
with open(abs_path, "r", encoding="utf-8") as f:
94+
content = f.read()
95+
return content
96+
except (UnicodeDecodeError, OSError):
97+
return "[SKIPPED: binary/unsupported file]"
98+
99+
async def list_directory(self, path: str = "") -> list[dict[str, Any]]:
100+
"""List files and directories at a path.
101+
102+
Returns structured entries: [{path, type, size}]
103+
"""
104+
abs_path = self._resolve_path(path) if path else self.root_path
105+
if abs_path is None:
106+
return [{"error": "invalid path"}]
107+
108+
if not os.path.exists(abs_path):
109+
return [{"error": "not found"}]
110+
111+
# Single file case
112+
if os.path.isfile(abs_path):
113+
rel_path = os.path.relpath(abs_path, self.root_path)
114+
return [{
115+
"path": rel_path.replace(os.sep, "/"),
116+
"type": "file",
117+
"size": os.path.getsize(abs_path),
118+
}]
119+
120+
# Directory listing
121+
entries = []
122+
try:
123+
for entry in os.listdir(abs_path):
124+
# Skip hidden files/dirs
125+
if entry.startswith("."):
126+
continue
127+
# Skip ignore patterns
128+
if entry in IGNORE_PATTERNS:
129+
continue
130+
131+
entry_path = os.path.join(abs_path, entry)
132+
rel_path = os.path.relpath(entry_path, self.root_path)
133+
134+
if os.path.isdir(entry_path):
135+
entries.append({
136+
"path": rel_path.replace(os.sep, "/"),
137+
"type": "dir",
138+
"size": 0,
139+
})
140+
else:
141+
entries.append({
142+
"path": rel_path.replace(os.sep, "/"),
143+
"type": "file",
144+
"size": os.path.getsize(entry_path),
145+
})
146+
except OSError:
147+
return [{"error": "cannot read directory"}]
148+
149+
return entries
150+
151+
async def search_code(self, query: str) -> list[dict[str, Any]]:
152+
"""Search for code patterns in the local repo using grep.
153+
154+
Returns paths + fragments. Soft-fails on error (returns []).
155+
"""
156+
if not query or not query.strip():
157+
return []
158+
159+
query = query.strip()
160+
161+
# Build grep command
162+
extensions = " ".join(f"--include='*{ext}'" for ext in SEARCH_EXTENSIONS)
163+
cmd = f"grep -rn {extensions} {query} {self.root_path}"
164+
165+
try:
166+
result = subprocess.run(
167+
cmd,
168+
shell=True,
169+
capture_output=True,
170+
text=True,
171+
timeout=10,
172+
)
173+
except subprocess.TimeoutExpired:
174+
return [] # Soft fail
175+
except Exception:
176+
return [] # Soft fail
177+
178+
if result.returncode != 0:
179+
return [] # No matches or error
180+
181+
results = []
182+
for line in result.stdout.splitlines()[:10]: # Limit to 10 results
183+
# Parse grep output: path:line:content
184+
parts = line.split(":", 2)
185+
if len(parts) >= 3:
186+
file_path = parts[0]
187+
rel_path = os.path.relpath(file_path, self.root_path)
188+
fragment = parts[2][:500] # Limit fragment size
189+
results.append({
190+
"path": rel_path.replace(os.sep, "/"),
191+
"fragment": fragment,
192+
})
193+
194+
return results
195+
196+
async def close(self):
197+
"""No-op for local tools (no HTTP client to close)."""
198+
pass
199+
200+
def format_source(self, path: str, content: str | None = None, needle: str | None = None) -> str:
201+
"""Format a source citation as local:path#Lx-Ly."""
202+
line_range = ""
203+
if content:
204+
line_range = find_line_range(content, needle)
205+
return f"local:{path}{line_range}"
206+

0 commit comments

Comments
 (0)