Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
380 changes: 379 additions & 1 deletion npx/python/cli/local_repo_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,388 @@ async def search_code(self, query: str) -> list[dict[str, Any]]:

return results

async def get_symbol_definition(self, symbol: str, context_file: str = "") -> str:
"""Find the definition of a symbol (function or class).

Uses grep to find 'def symbol' or 'class symbol' patterns.
Returns path + snippet or error message.
"""
if not symbol or not symbol.strip():
return "[ERROR: empty symbol]"

symbol = symbol.strip()

# Build grep command to find definitions
args = ["grep", "-rn"]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The grep command is missing the -E flag for extended regular expressions. The pattern (def|class) {symbol} on line 217 uses ( and | which are special characters in extended regex. Without -E, grep will perform a literal string search for (def|class) ..., which will cause the symbol definition search to fail.

Suggested change
args = ["grep", "-rn"]
args = ["grep", "-rn", "-E"]

for ext in SEARCH_EXTENSIONS:
args.append(f"--include=*{ext}")
args.append("--")
# Search for "def symbol" or "class symbol"
args.append(f"(def|class) {symbol}")
args.append(self.root_path)

try:
result = subprocess.run(
args,
capture_output=True,
text=True,
timeout=10,
)
except subprocess.TimeoutExpired:
return "[ERROR: search timeout]"
except Exception as e:
return f"[ERROR: {str(e)[:50]}]"

if result.returncode != 0:
return f"[ERROR: symbol '{symbol}' not found]"

# Parse first match
lines = result.stdout.splitlines()
if not lines:
return f"[ERROR: symbol '{symbol}' not found]"

first_match = lines[0]
parts = first_match.split(":", 2)
if len(parts) >= 3:
file_path = parts[0]
line_num = parts[1]
rel_path = os.path.relpath(file_path, self.root_path)
snippet = parts[2][:200]
return f"local:{rel_path}#L{line_num}\n{snippet}"

return f"[ERROR: could not parse definition]"

async def find_usages(self, symbol: str, scope_path: str = ".") -> str:
"""Find all usages of a symbol in the codebase.

Uses grep to find references. Returns formatted list of matches.
"""
if not symbol or not symbol.strip():
return "[ERROR: empty symbol]"

symbol = symbol.strip()

# Resolve scope path
scope_abs = self._resolve_path(scope_path) if scope_path != "." else self.root_path
if scope_abs is None:
scope_abs = self.root_path

# Build grep command
args = ["grep", "-rn"]
for ext in SEARCH_EXTENSIONS:
args.append(f"--include=*{ext}")
args.append("--")
args.append(symbol)
args.append(scope_abs)

try:
result = subprocess.run(
args,
capture_output=True,
text=True,
timeout=10,
)
except subprocess.TimeoutExpired:
return "[ERROR: search timeout]"
except Exception as e:
return f"[ERROR: {str(e)[:50]}]"

if result.returncode != 0:
return f"[ERROR: no usages found for '{symbol}']"

# Format results
results = []
for line in result.stdout.splitlines()[:20]: # Limit to 20 results
parts = line.split(":", 2)
if len(parts) >= 3:
file_path = parts[0]
line_num = parts[1]
rel_path = os.path.relpath(file_path, self.root_path)
snippet = parts[2][:100]
results.append(f" {rel_path}:{line_num} {snippet}")

if not results:
return f"[ERROR: no usages found for '{symbol}']"

return f"Found {len(results)} usages of '{symbol}':\n" + "\n".join(results)

async def get_type_hierarchy(self, class_name: str) -> str:
"""Get the type hierarchy (parent classes) for a class.

Finds class definition and parses parent classes.
"""
if not class_name or not class_name.strip():
return "[ERROR: empty class name]"

class_name = class_name.strip()

# Find class definition
args = ["grep", "-rn"]
for ext in SEARCH_EXTENSIONS:
args.append(f"--include=*{ext}")
args.append("--")
args.append(f"class {class_name}")
args.append(self.root_path)

try:
result = subprocess.run(
args,
capture_output=True,
text=True,
timeout=10,
)
except subprocess.TimeoutExpired:
return "[ERROR: search timeout]"
except Exception as e:
return f"[ERROR: {str(e)[:50]}]"

if result.returncode != 0:
return f"[ERROR: class '{class_name}' not found]"

# Parse first match to extract parent classes
lines = result.stdout.splitlines()
if not lines:
return f"[ERROR: class '{class_name}' not found]"

first_match = lines[0]
parts = first_match.split(":", 2)
if len(parts) < 3:
return f"[ERROR: could not parse class definition]"

file_path = parts[0]
line_num = parts[1]
definition = parts[2]
rel_path = os.path.relpath(file_path, self.root_path)

# Extract parent classes from "class X(Parent1, Parent2):" pattern
import re
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The import re statement is located inside the get_type_hierarchy method. According to PEP 8, imports should be placed at the top of the file. This improves readability, avoids the overhead of re-importing on every function call, and makes dependencies clear at a glance.

match = re.search(r"class\s+\w+\s*\((.*?)\)", definition)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The regular expression r"class\s+\w+\s*\((.*?)\)" is too broad and insecure. It uses a generic \w+ to match the class name, which could lead to incorrectly parsing the parent classes of a different class if grep returns multiple matches in the same file. You should use the specific class_name variable to make the pattern precise.

Suggested change
match = re.search(r"class\s+\w+\s*\((.*?)\)", definition)
match = re.search(rf"class\s+{re.escape(class_name)}\s*\(([^)]*?)\)", definition)

parents = []
if match:
parent_str = match.group(1)
parents = [p.strip() for p in parent_str.split(",")]

hierarchy = f"local:{rel_path}#L{line_num}\n"
hierarchy += f"class {class_name}"
if parents:
hierarchy += f"({', '.join(parents)})"
hierarchy += ":"

return hierarchy

async def get_call_graph(self, func_name: str, depth: int = 1) -> str:
"""Get the call graph for a function (functions it calls and callers).

Limited to depth 1 for performance.
"""
if not func_name or not func_name.strip():
return "[ERROR: empty function name]"

func_name = func_name.strip()

# Find function definition
args = ["grep", "-rn"]
for ext in SEARCH_EXTENSIONS:
args.append(f"--include=*{ext}")
args.append("--")
args.append(f"def {func_name}")
args.append(self.root_path)

try:
result = subprocess.run(
args,
capture_output=True,
text=True,
timeout=10,
)
except subprocess.TimeoutExpired:
return "[ERROR: search timeout]"
except Exception as e:
return f"[ERROR: {str(e)[:50]}]"

if result.returncode != 0:
return f"[ERROR: function '{func_name}' not found]"

lines = result.stdout.splitlines()
if not lines:
return f"[ERROR: function '{func_name}' not found]"

first_match = lines[0]
parts = first_match.split(":", 2)
if len(parts) < 3:
return f"[ERROR: could not parse function definition]"

file_path = parts[0]
line_num = parts[1]
rel_path = os.path.relpath(file_path, self.root_path)

# Find callers of this function
caller_args = ["grep", "-rn"]
for ext in SEARCH_EXTENSIONS:
caller_args.append(f"--include=*{ext}")
caller_args.append("--")
caller_args.append(f"{func_name}(")
caller_args.append(self.root_path)

callers = []
try:
caller_result = subprocess.run(
caller_args,
capture_output=True,
text=True,
timeout=10,
)
if caller_result.returncode == 0:
for line in caller_result.stdout.splitlines()[:10]:
parts = line.split(":", 2)
if len(parts) >= 3:
caller_file = parts[0]
caller_line = parts[1]
caller_rel = os.path.relpath(caller_file, self.root_path)
callers.append(f" {caller_rel}:{caller_line}")
except Exception:
pass # Soft fail on caller search

result_str = f"local:{rel_path}#L{line_num}\ndef {func_name}(...)"
if callers:
result_str += f"\n\nCallers ({len(callers)}):\n" + "\n".join(callers)
else:
result_str += "\n\nNo callers found"

return result_str

async def get_pr_comments(self, pr_number: int) -> str:
"""Get PR comments (not available in local mode)."""
return "[Not available for local repos — use --url mode]"

async def get_blame(self, path: str, line_range: str = "") -> str:
"""Get git blame information for a file or line range.

Uses 'git blame' subprocess. Line range format: "10,20" for lines 10-20.
"""
if not path or not path.strip():
return "[ERROR: empty path]"

abs_path = self._resolve_path(path)
if abs_path is None:
return "[ERROR: invalid path]"

if not os.path.exists(abs_path):
return "[ERROR: file not found]"

# Build git blame command
args = ["git", "blame"]
if line_range:
# Parse line range "start,end"
try:
parts = line_range.split(",")
if len(parts) == 2:
start, end = parts[0].strip(), parts[1].strip()
args.append(f"-L{start},{end}")
except Exception:
pass # Ignore malformed line range

args.append(abs_path)

try:
result = subprocess.run(
args,
capture_output=True,
text=True,
timeout=10,
cwd=self.root_path,
)
except subprocess.TimeoutExpired:
return "[ERROR: blame timeout]"
except Exception as e:
return f"[ERROR: {str(e)[:50]}]"

if result.returncode != 0:
return f"[ERROR: git blame failed]"

# Limit output to first 30 lines
lines = result.stdout.splitlines()[:30]
return "\n".join(lines) if lines else "[ERROR: no blame output]"

async def get_commit_history(self, path: str, limit: int = 5) -> str:
"""Get commit history for a file.

Uses 'git log --oneline' subprocess.
"""
if not path or not path.strip():
return "[ERROR: empty path]"

abs_path = self._resolve_path(path)
if abs_path is None:
return "[ERROR: invalid path]"

if not os.path.exists(abs_path):
return "[ERROR: file not found]"

# Clamp limit
limit = max(1, min(limit, 50))

# Build git log command
args = ["git", "log", "--oneline", f"-n{limit}", "--", abs_path]

try:
result = subprocess.run(
args,
capture_output=True,
text=True,
timeout=10,
cwd=self.root_path,
)
except subprocess.TimeoutExpired:
return "[ERROR: log timeout]"
except Exception as e:
return f"[ERROR: {str(e)[:50]}]"

if result.returncode != 0:
return "[ERROR: git log failed]"

lines = result.stdout.splitlines()
if not lines:
return "[ERROR: no commit history found]"

return "\n".join(lines)

async def get_related_issues(self, query_text: str) -> str:
"""Search for related issues in git log (not available in local mode for GitHub issues)."""
if not query_text or not query_text.strip():
return "[ERROR: empty query]"

query_text = query_text.strip()

# Search git log messages for the query
args = ["git", "log", "--oneline", "--all", "--grep", query_text]

try:
result = subprocess.run(
args,
capture_output=True,
text=True,
timeout=10,
cwd=self.root_path,
)
except subprocess.TimeoutExpired:
return "[ERROR: search timeout]"
except Exception as e:
return f"[ERROR: {str(e)[:50]}]"

if result.returncode != 0:
return f"[ERROR: no matching commits found for '{query_text}']"

lines = result.stdout.splitlines()[:20] # Limit to 20 results
if not lines:
return f"[ERROR: no matching commits found for '{query_text}']"

return f"Found {len(lines)} matching commits:\n" + "\n".join(lines)

async def close(self):
"""No-op for local tools (no HTTP client to close)."""
pass

def format_source(self, path: str, content: str | None = None, needle: str | None = None) -> str:
"""Format a source citation as local:path#Lx-Ly."""
line_range = ""
Expand Down
Loading