-
Notifications
You must be signed in to change notification settings - Fork 39
Add 8 deep code understanding and GitHub context tools for RLM agent #6
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: add-local-folder-support
Are you sure you want to change the base?
Changes from 4 commits
356a44d
0c0e4f0
c21a385
6bbebb6
c44e271
eeb91aa
50a5c2f
406f323
e5cd55f
6741fc2
fe67e0a
1aba8aa
7813c57
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -197,10 +197,388 @@ async def search_code(self, query: str) -> list[dict[str, Any]]: | |||||
|
|
||||||
| return results | ||||||
|
|
||||||
| async def get_symbol_definition(self, symbol: str, context_file: str = "") -> str: | ||||||
| """Find the definition of a symbol (function or class). | ||||||
|
|
||||||
| Uses grep to find 'def symbol' or 'class symbol' patterns. | ||||||
| Returns path + snippet or error message. | ||||||
| """ | ||||||
| if not symbol or not symbol.strip(): | ||||||
| return "[ERROR: empty symbol]" | ||||||
|
|
||||||
| symbol = symbol.strip() | ||||||
|
|
||||||
| # Build grep command to find definitions | ||||||
| args = ["grep", "-rn"] | ||||||
| for ext in SEARCH_EXTENSIONS: | ||||||
| args.append(f"--include=*{ext}") | ||||||
| args.append("--") | ||||||
| # Search for "def symbol" or "class symbol" | ||||||
| args.append(f"(def|class) {symbol}") | ||||||
| args.append(self.root_path) | ||||||
|
|
||||||
| try: | ||||||
| result = subprocess.run( | ||||||
| args, | ||||||
| capture_output=True, | ||||||
| text=True, | ||||||
| timeout=10, | ||||||
| ) | ||||||
| except subprocess.TimeoutExpired: | ||||||
| return "[ERROR: search timeout]" | ||||||
| except Exception as e: | ||||||
| return f"[ERROR: {str(e)[:50]}]" | ||||||
|
|
||||||
| if result.returncode != 0: | ||||||
| return f"[ERROR: symbol '{symbol}' not found]" | ||||||
|
|
||||||
| # Parse first match | ||||||
| lines = result.stdout.splitlines() | ||||||
| if not lines: | ||||||
| return f"[ERROR: symbol '{symbol}' not found]" | ||||||
|
|
||||||
| first_match = lines[0] | ||||||
| parts = first_match.split(":", 2) | ||||||
| if len(parts) >= 3: | ||||||
| file_path = parts[0] | ||||||
| line_num = parts[1] | ||||||
| rel_path = os.path.relpath(file_path, self.root_path) | ||||||
| snippet = parts[2][:200] | ||||||
| return f"local:{rel_path}#L{line_num}\n{snippet}" | ||||||
|
|
||||||
| return f"[ERROR: could not parse definition]" | ||||||
|
|
||||||
| async def find_usages(self, symbol: str, scope_path: str = ".") -> str: | ||||||
| """Find all usages of a symbol in the codebase. | ||||||
|
|
||||||
| Uses grep to find references. Returns formatted list of matches. | ||||||
| """ | ||||||
| if not symbol or not symbol.strip(): | ||||||
| return "[ERROR: empty symbol]" | ||||||
|
|
||||||
| symbol = symbol.strip() | ||||||
|
|
||||||
| # Resolve scope path | ||||||
| scope_abs = self._resolve_path(scope_path) if scope_path != "." else self.root_path | ||||||
| if scope_abs is None: | ||||||
| scope_abs = self.root_path | ||||||
|
|
||||||
| # Build grep command | ||||||
| args = ["grep", "-rn"] | ||||||
| for ext in SEARCH_EXTENSIONS: | ||||||
| args.append(f"--include=*{ext}") | ||||||
| args.append("--") | ||||||
| args.append(symbol) | ||||||
| args.append(scope_abs) | ||||||
|
|
||||||
| try: | ||||||
| result = subprocess.run( | ||||||
| args, | ||||||
| capture_output=True, | ||||||
| text=True, | ||||||
| timeout=10, | ||||||
| ) | ||||||
| except subprocess.TimeoutExpired: | ||||||
| return "[ERROR: search timeout]" | ||||||
| except Exception as e: | ||||||
| return f"[ERROR: {str(e)[:50]}]" | ||||||
|
|
||||||
| if result.returncode != 0: | ||||||
| return f"[ERROR: no usages found for '{symbol}']" | ||||||
|
|
||||||
| # Format results | ||||||
| results = [] | ||||||
| for line in result.stdout.splitlines()[:20]: # Limit to 20 results | ||||||
| parts = line.split(":", 2) | ||||||
| if len(parts) >= 3: | ||||||
| file_path = parts[0] | ||||||
| line_num = parts[1] | ||||||
| rel_path = os.path.relpath(file_path, self.root_path) | ||||||
| snippet = parts[2][:100] | ||||||
| results.append(f" {rel_path}:{line_num} {snippet}") | ||||||
|
|
||||||
| if not results: | ||||||
| return f"[ERROR: no usages found for '{symbol}']" | ||||||
|
|
||||||
| return f"Found {len(results)} usages of '{symbol}':\n" + "\n".join(results) | ||||||
|
|
||||||
| async def get_type_hierarchy(self, class_name: str) -> str: | ||||||
| """Get the type hierarchy (parent classes) for a class. | ||||||
|
|
||||||
| Finds class definition and parses parent classes. | ||||||
| """ | ||||||
| if not class_name or not class_name.strip(): | ||||||
| return "[ERROR: empty class name]" | ||||||
|
|
||||||
| class_name = class_name.strip() | ||||||
|
|
||||||
| # Find class definition | ||||||
| args = ["grep", "-rn"] | ||||||
| for ext in SEARCH_EXTENSIONS: | ||||||
| args.append(f"--include=*{ext}") | ||||||
| args.append("--") | ||||||
| args.append(f"class {class_name}") | ||||||
| args.append(self.root_path) | ||||||
|
|
||||||
| try: | ||||||
| result = subprocess.run( | ||||||
| args, | ||||||
| capture_output=True, | ||||||
| text=True, | ||||||
| timeout=10, | ||||||
| ) | ||||||
| except subprocess.TimeoutExpired: | ||||||
| return "[ERROR: search timeout]" | ||||||
| except Exception as e: | ||||||
| return f"[ERROR: {str(e)[:50]}]" | ||||||
|
|
||||||
| if result.returncode != 0: | ||||||
| return f"[ERROR: class '{class_name}' not found]" | ||||||
|
|
||||||
| # Parse first match to extract parent classes | ||||||
| lines = result.stdout.splitlines() | ||||||
| if not lines: | ||||||
| return f"[ERROR: class '{class_name}' not found]" | ||||||
|
|
||||||
| first_match = lines[0] | ||||||
| parts = first_match.split(":", 2) | ||||||
| if len(parts) < 3: | ||||||
| return f"[ERROR: could not parse class definition]" | ||||||
|
|
||||||
| file_path = parts[0] | ||||||
| line_num = parts[1] | ||||||
| definition = parts[2] | ||||||
| rel_path = os.path.relpath(file_path, self.root_path) | ||||||
|
|
||||||
| # Extract parent classes from "class X(Parent1, Parent2):" pattern | ||||||
| import re | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||
| match = re.search(r"class\s+\w+\s*\((.*?)\)", definition) | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The regular expression
Suggested change
|
||||||
| parents = [] | ||||||
| if match: | ||||||
| parent_str = match.group(1) | ||||||
| parents = [p.strip() for p in parent_str.split(",")] | ||||||
|
|
||||||
| hierarchy = f"local:{rel_path}#L{line_num}\n" | ||||||
| hierarchy += f"class {class_name}" | ||||||
| if parents: | ||||||
| hierarchy += f"({', '.join(parents)})" | ||||||
| hierarchy += ":" | ||||||
|
|
||||||
| return hierarchy | ||||||
|
|
||||||
| async def get_call_graph(self, func_name: str, depth: int = 1) -> str: | ||||||
| """Get the call graph for a function (functions it calls and callers). | ||||||
|
|
||||||
| Limited to depth 1 for performance. | ||||||
| """ | ||||||
| if not func_name or not func_name.strip(): | ||||||
| return "[ERROR: empty function name]" | ||||||
|
|
||||||
| func_name = func_name.strip() | ||||||
|
|
||||||
| # Find function definition | ||||||
| args = ["grep", "-rn"] | ||||||
| for ext in SEARCH_EXTENSIONS: | ||||||
| args.append(f"--include=*{ext}") | ||||||
| args.append("--") | ||||||
| args.append(f"def {func_name}") | ||||||
| args.append(self.root_path) | ||||||
|
|
||||||
| try: | ||||||
| result = subprocess.run( | ||||||
| args, | ||||||
| capture_output=True, | ||||||
| text=True, | ||||||
| timeout=10, | ||||||
| ) | ||||||
| except subprocess.TimeoutExpired: | ||||||
| return "[ERROR: search timeout]" | ||||||
| except Exception as e: | ||||||
| return f"[ERROR: {str(e)[:50]}]" | ||||||
|
|
||||||
| if result.returncode != 0: | ||||||
| return f"[ERROR: function '{func_name}' not found]" | ||||||
|
|
||||||
| lines = result.stdout.splitlines() | ||||||
| if not lines: | ||||||
| return f"[ERROR: function '{func_name}' not found]" | ||||||
|
|
||||||
| first_match = lines[0] | ||||||
| parts = first_match.split(":", 2) | ||||||
| if len(parts) < 3: | ||||||
| return f"[ERROR: could not parse function definition]" | ||||||
|
|
||||||
| file_path = parts[0] | ||||||
| line_num = parts[1] | ||||||
| rel_path = os.path.relpath(file_path, self.root_path) | ||||||
|
|
||||||
| # Find callers of this function | ||||||
| caller_args = ["grep", "-rn"] | ||||||
| for ext in SEARCH_EXTENSIONS: | ||||||
| caller_args.append(f"--include=*{ext}") | ||||||
| caller_args.append("--") | ||||||
| caller_args.append(f"{func_name}(") | ||||||
| caller_args.append(self.root_path) | ||||||
|
|
||||||
| callers = [] | ||||||
| try: | ||||||
| caller_result = subprocess.run( | ||||||
| caller_args, | ||||||
| capture_output=True, | ||||||
| text=True, | ||||||
| timeout=10, | ||||||
| ) | ||||||
| if caller_result.returncode == 0: | ||||||
| for line in caller_result.stdout.splitlines()[:10]: | ||||||
| parts = line.split(":", 2) | ||||||
| if len(parts) >= 3: | ||||||
| caller_file = parts[0] | ||||||
| caller_line = parts[1] | ||||||
| caller_rel = os.path.relpath(caller_file, self.root_path) | ||||||
| callers.append(f" {caller_rel}:{caller_line}") | ||||||
| except Exception: | ||||||
| pass # Soft fail on caller search | ||||||
|
|
||||||
| result_str = f"local:{rel_path}#L{line_num}\ndef {func_name}(...)" | ||||||
| if callers: | ||||||
| result_str += f"\n\nCallers ({len(callers)}):\n" + "\n".join(callers) | ||||||
| else: | ||||||
| result_str += "\n\nNo callers found" | ||||||
|
|
||||||
| return result_str | ||||||
|
|
||||||
| async def get_pr_comments(self, pr_number: int) -> str: | ||||||
| """Get PR comments (not available in local mode).""" | ||||||
| return "[Not available for local repos — use --url mode]" | ||||||
|
|
||||||
| async def get_blame(self, path: str, line_range: str = "") -> str: | ||||||
| """Get git blame information for a file or line range. | ||||||
|
|
||||||
| Uses 'git blame' subprocess. Line range format: "10,20" for lines 10-20. | ||||||
| """ | ||||||
| if not path or not path.strip(): | ||||||
| return "[ERROR: empty path]" | ||||||
|
|
||||||
| abs_path = self._resolve_path(path) | ||||||
| if abs_path is None: | ||||||
| return "[ERROR: invalid path]" | ||||||
|
|
||||||
| if not os.path.exists(abs_path): | ||||||
| return "[ERROR: file not found]" | ||||||
|
|
||||||
| # Build git blame command | ||||||
| args = ["git", "blame"] | ||||||
| if line_range: | ||||||
| # Parse line range "start,end" | ||||||
| try: | ||||||
| parts = line_range.split(",") | ||||||
| if len(parts) == 2: | ||||||
| start, end = parts[0].strip(), parts[1].strip() | ||||||
| args.append(f"-L{start},{end}") | ||||||
| except Exception: | ||||||
| pass # Ignore malformed line range | ||||||
|
|
||||||
| args.append(abs_path) | ||||||
|
|
||||||
| try: | ||||||
| result = subprocess.run( | ||||||
| args, | ||||||
| capture_output=True, | ||||||
| text=True, | ||||||
| timeout=10, | ||||||
| cwd=self.root_path, | ||||||
| ) | ||||||
| except subprocess.TimeoutExpired: | ||||||
| return "[ERROR: blame timeout]" | ||||||
| except Exception as e: | ||||||
| return f"[ERROR: {str(e)[:50]}]" | ||||||
|
|
||||||
| if result.returncode != 0: | ||||||
| return f"[ERROR: git blame failed]" | ||||||
|
|
||||||
| # Limit output to first 30 lines | ||||||
| lines = result.stdout.splitlines()[:30] | ||||||
| return "\n".join(lines) if lines else "[ERROR: no blame output]" | ||||||
|
|
||||||
| async def get_commit_history(self, path: str, limit: int = 5) -> str: | ||||||
| """Get commit history for a file. | ||||||
|
|
||||||
| Uses 'git log --oneline' subprocess. | ||||||
| """ | ||||||
| if not path or not path.strip(): | ||||||
| return "[ERROR: empty path]" | ||||||
|
|
||||||
| abs_path = self._resolve_path(path) | ||||||
| if abs_path is None: | ||||||
| return "[ERROR: invalid path]" | ||||||
|
|
||||||
| if not os.path.exists(abs_path): | ||||||
| return "[ERROR: file not found]" | ||||||
|
|
||||||
| # Clamp limit | ||||||
| limit = max(1, min(limit, 50)) | ||||||
|
|
||||||
| # Build git log command | ||||||
| args = ["git", "log", "--oneline", f"-n{limit}", "--", abs_path] | ||||||
|
|
||||||
| try: | ||||||
| result = subprocess.run( | ||||||
| args, | ||||||
| capture_output=True, | ||||||
| text=True, | ||||||
| timeout=10, | ||||||
| cwd=self.root_path, | ||||||
| ) | ||||||
| except subprocess.TimeoutExpired: | ||||||
| return "[ERROR: log timeout]" | ||||||
| except Exception as e: | ||||||
| return f"[ERROR: {str(e)[:50]}]" | ||||||
|
|
||||||
| if result.returncode != 0: | ||||||
| return "[ERROR: git log failed]" | ||||||
|
|
||||||
| lines = result.stdout.splitlines() | ||||||
| if not lines: | ||||||
| return "[ERROR: no commit history found]" | ||||||
|
|
||||||
| return "\n".join(lines) | ||||||
|
|
||||||
| async def get_related_issues(self, query_text: str) -> str: | ||||||
| """Search for related issues in git log (not available in local mode for GitHub issues).""" | ||||||
| if not query_text or not query_text.strip(): | ||||||
| return "[ERROR: empty query]" | ||||||
|
|
||||||
| query_text = query_text.strip() | ||||||
|
|
||||||
| # Search git log messages for the query | ||||||
| args = ["git", "log", "--oneline", "--all", "--grep", query_text] | ||||||
|
|
||||||
| try: | ||||||
| result = subprocess.run( | ||||||
| args, | ||||||
| capture_output=True, | ||||||
| text=True, | ||||||
| timeout=10, | ||||||
| cwd=self.root_path, | ||||||
| ) | ||||||
| except subprocess.TimeoutExpired: | ||||||
| return "[ERROR: search timeout]" | ||||||
| except Exception as e: | ||||||
| return f"[ERROR: {str(e)[:50]}]" | ||||||
|
|
||||||
| if result.returncode != 0: | ||||||
| return f"[ERROR: no matching commits found for '{query_text}']" | ||||||
|
|
||||||
| lines = result.stdout.splitlines()[:20] # Limit to 20 results | ||||||
| if not lines: | ||||||
| return f"[ERROR: no matching commits found for '{query_text}']" | ||||||
|
|
||||||
| return f"Found {len(lines)} matching commits:\n" + "\n".join(lines) | ||||||
|
|
||||||
| async def close(self): | ||||||
| """No-op for local tools (no HTTP client to close).""" | ||||||
| pass | ||||||
|
|
||||||
| def format_source(self, path: str, content: str | None = None, needle: str | None = None) -> str: | ||||||
| """Format a source citation as local:path#Lx-Ly.""" | ||||||
| line_range = "" | ||||||
|
|
||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
grepcommand is missing the-Eflag for extended regular expressions. The pattern(def|class) {symbol}on line 217 uses(and|which are special characters in extended regex. Without-E,grepwill perform a literal string search for(def|class) ..., which will cause the symbol definition search to fail.