From 7a698cf8f7b1920197adcdadf4b80658afc21d20 Mon Sep 17 00:00:00 2001 From: mambo-wang Date: Thu, 18 Jun 2026 15:19:01 +0800 Subject: [PATCH 1/5] =?UTF-8?q?AI=20IDE=E9=A9=B1=E5=8A=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- codewiki/mcp/server.py | 497 +++++++++++++++++++++------- codewiki/mcp/session.py | 92 +++++ codewiki/mcp/tools/__init__.py | 6 + codewiki/mcp/tools/analysis.py | 112 +++++++ codewiki/mcp/tools/code_reader.py | 107 ++++++ codewiki/mcp/tools/doc_writer.py | 167 ++++++++++ codewiki/mcp/tools/module_tree.py | 133 ++++++++ codewiki/mcp/tools/prompt_server.py | 176 ++++++++++ 8 files changed, 1170 insertions(+), 120 deletions(-) create mode 100644 codewiki/mcp/session.py create mode 100644 codewiki/mcp/tools/__init__.py create mode 100644 codewiki/mcp/tools/analysis.py create mode 100644 codewiki/mcp/tools/code_reader.py create mode 100644 codewiki/mcp/tools/doc_writer.py create mode 100644 codewiki/mcp/tools/module_tree.py create mode 100644 codewiki/mcp/tools/prompt_server.py diff --git a/codewiki/mcp/server.py b/codewiki/mcp/server.py index d54539ea..20d6abb3 100644 --- a/codewiki/mcp/server.py +++ b/codewiki/mcp/server.py @@ -1,16 +1,27 @@ """ CodeWiki MCP Server. -Exposes documentation generation as MCP tools: - - generate_docs: Generate full documentation for a repository - - analyze_repo: Analyze repository structure and dependencies - - get_module_tree: Get the module clustering for a repository +Provides two sets of tools: + +**Fine-grained tools (IDE-driven, zero LLM config):** + - ``analyze_repo`` — Parse a repo and build a dependency graph (session-based) + - ``read_code_components`` — Read source code for given component IDs + - ``view_repo_file`` — Read-only file/directory browsing + - ``write_doc_file`` — Create a documentation .md file with Mermaid validation + - ``edit_doc_file`` — Edit a documentation file (str_replace / insert / undo) + - ``save_module_tree`` — Persist IDE agent's module clustering + - ``get_processing_order`` — Get leaf-first documentation order + - ``get_prompt`` — Retrieve CodeWiki's prompt templates + - ``close_session`` — Clean up a session + +**Legacy tools (require CodeWiki LLM config):** + - ``generate_docs`` — Full documentation generation (black-box) + - ``get_module_tree`` — Retrieve existing module clustering Usage: - # Run as standalone MCP server (stdio transport) python -m codewiki.mcp.server - # Or register in your MCP client config: + # Cursor / Claude Desktop config: { "mcpServers": { "codewiki": { @@ -30,54 +41,49 @@ from mcp.server import Server from mcp.server.stdio import stdio_server -from mcp.types import ( - TextContent, - Tool, -) +from mcp.types import TextContent, Tool + +from codewiki.mcp.session import SessionStore logger = logging.getLogger(__name__) -# Create the MCP server +# --------------------------------------------------------------------------- +# Global session store (lives for the lifetime of the MCP server process) +# --------------------------------------------------------------------------- +_store = SessionStore() + +# --------------------------------------------------------------------------- +# MCP Server instance +# --------------------------------------------------------------------------- server = Server("codewiki") -def _load_config(): - """Load CodeWiki configuration from ~/.codewiki/config.json + keyring.""" - from codewiki.cli.config_manager import ConfigManager - manager = ConfigManager() - if not manager.load(): - raise RuntimeError( - "CodeWiki not configured. Run 'codewiki config set' first." - ) - return manager - +# =================================================================== +# Tool definitions +# =================================================================== -@server.list_tools() -async def list_tools() -> list[Tool]: - """List available CodeWiki MCP tools.""" +def _fine_grained_tools() -> list[Tool]: + """Return the zero-config, IDE-driven tool set.""" return [ Tool( - name="generate_docs", + name="analyze_repo", description=( - "Generate comprehensive AI-powered documentation for a code repository. " - "Analyzes dependencies, clusters modules, and generates markdown documentation." + "Analyze a code repository's structure, dependencies, and components " + "using Tree-sitter AST parsing. Returns a component index and leaf nodes. " + "No LLM required. This is the entry point for the wiki generation pipeline. " + "After calling this, use get_prompt('cluster') to learn clustering rules, " + "then save_module_tree to persist your grouping." ), inputSchema={ "type": "object", "properties": { "repo_path": { "type": "string", - "description": "Absolute path to the repository to document", + "description": "Absolute path to the repository to analyze", }, "output_dir": { "type": "string", - "description": "Output directory for generated docs (default: ./docs)", - "default": "docs", - }, - "doc_type": { - "type": "string", - "enum": ["api", "architecture", "user-guide", "developer"], - "description": "Type of documentation to generate", + "description": "Output directory for generated docs (default: /docs)", }, "include_patterns": { "type": "string", @@ -92,18 +98,242 @@ async def list_tools() -> list[Tool]: }, ), Tool( - name="analyze_repo", + name="read_code_components", + description=( + "Read the source code for a list of component IDs. " + "Component IDs have the form 'file_path::ComponentName'. " + "Returns the source code with language-aware code fences." + ), + inputSchema={ + "type": "object", + "properties": { + "session_id": { + "type": "string", + "description": "Session ID from analyze_repo", + }, + "component_ids": { + "type": "array", + "items": {"type": "string"}, + "description": "List of component IDs to read", + }, + }, + "required": ["session_id", "component_ids"], + }, + ), + Tool( + name="view_repo_file", + description=( + "Read-only view of a file or directory inside the analyzed repository. " + "Use this to explore code that isn't in the component index." + ), + inputSchema={ + "type": "object", + "properties": { + "session_id": { + "type": "string", + "description": "Session ID from analyze_repo", + }, + "path": { + "type": "string", + "description": "Relative path within the repository", + }, + "view_range": { + "type": "array", + "items": {"type": "integer"}, + "description": "Optional [start_line, end_line] (1-indexed, -1 for end)", + }, + }, + "required": ["session_id", "path"], + }, + ), + Tool( + name="write_doc_file", + description=( + "Create a new markdown documentation file in the output directory. " + "Automatically validates Mermaid diagrams after writing." + ), + inputSchema={ + "type": "object", + "properties": { + "session_id": { + "type": "string", + "description": "Session ID from analyze_repo", + }, + "filename": { + "type": "string", + "description": "Filename for the doc (e.g., 'auth_module.md')", + }, + "content": { + "type": "string", + "description": "Markdown content to write", + }, + }, + "required": ["session_id", "filename", "content"], + }, + ), + Tool( + name="edit_doc_file", + description=( + "Edit an existing documentation file. Supports str_replace (find-and-replace), " + "insert (add text at a line), and undo (revert last edit). " + "Automatically validates Mermaid diagrams after editing." + ), + inputSchema={ + "type": "object", + "properties": { + "session_id": { + "type": "string", + "description": "Session ID from analyze_repo", + }, + "filename": { + "type": "string", + "description": "Filename of the doc to edit", + }, + "command": { + "type": "string", + "enum": ["str_replace", "insert", "undo"], + "description": "Edit command to run", + }, + "old_str": { + "type": "string", + "description": "String to find (required for str_replace)", + }, + "new_str": { + "type": "string", + "description": "Replacement string (for str_replace/insert)", + }, + "insert_line": { + "type": "integer", + "description": "Line number for insert (0-indexed)", + }, + }, + "required": ["session_id", "filename", "command"], + }, + ), + Tool( + name="save_module_tree", + description=( + "Save the IDE agent's module clustering result. " + "Accepts a JSON module tree and persists it to disk. " + "Returns the recommended leaf-first processing order." + ), + inputSchema={ + "type": "object", + "properties": { + "session_id": { + "type": "string", + "description": "Session ID from analyze_repo", + }, + "module_tree": { + "type": "object", + "description": ( + "Module tree dict. Each key is a module name with value " + "{'components': [component_ids], 'children': {nested modules}}" + ), + }, + }, + "required": ["session_id", "module_tree"], + }, + ), + Tool( + name="get_processing_order", + description=( + "Get the leaf-first processing order for documentation generation. " + "Process leaf modules (is_leaf=true) before parent modules." + ), + inputSchema={ + "type": "object", + "properties": { + "session_id": { + "type": "string", + "description": "Session ID from analyze_repo", + }, + }, + "required": ["session_id"], + }, + ), + Tool( + name="get_prompt", description=( - "Analyze a repository's structure, dependencies, and component hierarchy " - "without generating full documentation. Returns file counts, languages, " - "and dependency information." + "Retrieve CodeWiki's prompt templates for each pipeline stage. " + "Available types: cluster, system_complex, system_leaf, user, " + "overview_module, overview_repo. Optionally pass variables to " + "fill in template placeholders." + ), + inputSchema={ + "type": "object", + "properties": { + "prompt_type": { + "type": "string", + "enum": [ + "cluster", + "system_complex", + "system_leaf", + "user", + "overview_module", + "overview_repo", + ], + "description": "Which prompt template to retrieve", + }, + "variables": { + "type": "object", + "description": "Optional template variables to fill in", + }, + }, + "required": ["prompt_type"], + }, + ), + Tool( + name="close_session", + description="Close and clean up an analysis session to free memory.", + inputSchema={ + "type": "object", + "properties": { + "session_id": { + "type": "string", + "description": "Session ID to close", + }, + }, + "required": ["session_id"], + }, + ), + ] + + +def _legacy_tools() -> list[Tool]: + """Return the legacy tools that require CodeWiki LLM configuration.""" + return [ + Tool( + name="generate_docs", + description=( + "[LEGACY — requires 'codewiki config set' first] " + "Generate full documentation for a repository in one shot. " + "For IDE-driven generation, use the fine-grained tools instead." ), inputSchema={ "type": "object", "properties": { "repo_path": { "type": "string", - "description": "Absolute path to the repository to analyze", + "description": "Absolute path to the repository to document", + }, + "output_dir": { + "type": "string", + "description": "Output directory for generated docs (default: ./docs)", + "default": "docs", + }, + "doc_type": { + "type": "string", + "enum": ["api", "architecture", "user-guide", "developer"], + "description": "Type of documentation to generate", + }, + "include_patterns": { + "type": "string", + "description": "Comma-separated file patterns to include", + }, + "exclude_patterns": { + "type": "string", + "description": "Comma-separated patterns to exclude", }, }, "required": ["repo_path"], @@ -111,10 +341,7 @@ async def list_tools() -> list[Tool]: ), Tool( name="get_module_tree", - description=( - "Get the module clustering tree for a repository. " - "Shows how source files are grouped into logical modules." - ), + description="Get the existing module clustering tree for a repository.", inputSchema={ "type": "object", "properties": { @@ -134,32 +361,101 @@ async def list_tools() -> list[Tool]: ] +# =================================================================== +# Tool dispatch +# =================================================================== + +@server.list_tools() +async def list_tools() -> list[Tool]: + """List all available CodeWiki MCP tools.""" + return _fine_grained_tools() + _legacy_tools() + + @server.call_tool() async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]: - """Handle MCP tool calls.""" + """Route tool calls to the appropriate handler.""" try: - if name == "generate_docs": - return await _handle_generate_docs(arguments) - elif name == "analyze_repo": - return await _handle_analyze_repo(arguments) + # --- Fine-grained tools (no LLM config needed) --- + if name == "analyze_repo": + from codewiki.mcp.tools.analysis import handle_analyze_repo + return [_text(handle_analyze_repo(arguments, _store))] + + elif name == "read_code_components": + from codewiki.mcp.tools.code_reader import handle_read_code_components + return [_text(handle_read_code_components(arguments, _store))] + + elif name == "view_repo_file": + from codewiki.mcp.tools.code_reader import handle_view_repo_file + return [_text(handle_view_repo_file(arguments, _store))] + + elif name == "write_doc_file": + from codewiki.mcp.tools.doc_writer import handle_write_doc_file + result = await handle_write_doc_file(arguments, _store) + return [_text(result)] + + elif name == "edit_doc_file": + from codewiki.mcp.tools.doc_writer import handle_edit_doc_file + result = await handle_edit_doc_file(arguments, _store) + return [_text(result)] + + elif name == "save_module_tree": + from codewiki.mcp.tools.module_tree import handle_save_module_tree + return [_text(handle_save_module_tree(arguments, _store))] + + elif name == "get_processing_order": + from codewiki.mcp.tools.module_tree import handle_get_processing_order + return [_text(handle_get_processing_order(arguments, _store))] + + elif name == "get_prompt": + from codewiki.mcp.tools.prompt_server import handle_get_prompt + return [_text(handle_get_prompt(arguments, _store))] + + elif name == "close_session": + sid = arguments["session_id"] + removed = _store.remove(sid) + return [_text(json.dumps({ + "status": "closed" if removed else "not_found", + "session_id": sid, + }))] + + # --- Legacy tools (require CodeWiki LLM config) --- + elif name == "generate_docs": + return await _legacy_generate_docs(arguments) + elif name == "get_module_tree": - return await _handle_get_module_tree(arguments) + return await _legacy_get_module_tree(arguments) + else: - return [TextContent(type="text", text=f"Unknown tool: {name}")] + return [_text(json.dumps({"error": f"Unknown tool: {name}"}))] + except Exception as e: logger.error("Tool %s failed: %s", name, e, exc_info=True) - return [TextContent(type="text", text=f"Error: {e}")] + return [_text(json.dumps({"error": str(e)}))] + + +# =================================================================== +# Legacy tool handlers (require _load_config) +# =================================================================== + +def _load_config(): + """Load CodeWiki configuration from ~/.codewiki/config.json + keyring.""" + from codewiki.cli.config_manager import ConfigManager + manager = ConfigManager() + if not manager.load(): + raise RuntimeError( + "CodeWiki not configured. Run 'codewiki config set' first." + ) + return manager -async def _handle_generate_docs(arguments: dict[str, Any]) -> list[TextContent]: - """Handle generate_docs tool call.""" +async def _legacy_generate_docs(arguments: dict[str, Any]) -> list[TextContent]: + """Legacy generate_docs — requires CodeWiki LLM configuration.""" repo_path = Path(arguments["repo_path"]).expanduser().resolve() output_dir = Path(arguments.get("output_dir", "docs")).expanduser().resolve() if not repo_path.exists(): - return [TextContent(type="text", text=f"Repository not found: {repo_path}")] + return [_text(json.dumps({"error": f"Repository not found: {repo_path}"}))] - # Load config manager = _load_config() config = manager.get_config() api_key = manager.get_api_key() @@ -167,9 +463,8 @@ async def _handle_generate_docs(arguments: dict[str, Any]) -> list[TextContent]: from codewiki.src.be.backend import is_caw_provider caw_mode = bool(config) and is_caw_provider(getattr(config, "provider", "")) if not api_key and not caw_mode: - return [TextContent(type="text", text="API key not configured. Run 'codewiki config set --api-key '")] + return [_text(json.dumps({"error": "API key not configured. Run 'codewiki config set --api-key '"}))] - # Build agent instructions from arguments agent_instructions = {} if arguments.get("doc_type"): agent_instructions["doc_type"] = arguments["doc_type"] @@ -197,11 +492,8 @@ async def _handle_generate_docs(arguments: dict[str, Any]) -> list[TextContent]: from codewiki.src.be.documentation_generator import DocumentationGenerator doc_gen = DocumentationGenerator(backend_config) - - # Run generation await doc_gen.run() - # Collect results generated_files = [] for f in output_dir.iterdir(): if f.suffix in (".md", ".json", ".html"): @@ -213,74 +505,23 @@ async def _handle_generate_docs(arguments: dict[str, Any]) -> list[TextContent]: "files_generated": sorted(generated_files), "file_count": len(generated_files), } - return [TextContent(type="text", text=json.dumps(result, indent=2))] + return [_text(json.dumps(result, indent=2))] -async def _handle_analyze_repo(arguments: dict[str, Any]) -> list[TextContent]: - """Handle analyze_repo tool call — lightweight dependency analysis only.""" - repo_path = Path(arguments["repo_path"]).expanduser().resolve() - - if not repo_path.exists(): - return [TextContent(type="text", text=f"Repository not found: {repo_path}")] - - manager = _load_config() - config = manager.get_config() - api_key = manager.get_api_key() - - from codewiki.src.config import Config as BackendConfig, set_cli_context - set_cli_context(True) - - # Create a minimal backend config (no LLM calls needed for analysis) - backend_config = BackendConfig.from_cli( - repo_path=str(repo_path), - output_dir=str(repo_path / ".codewiki_temp"), - llm_base_url=config.base_url or "http://localhost", - llm_api_key=api_key or "not-needed", - main_model=config.main_model or "unused", - cluster_model=config.cluster_model or "unused", - fallback_model=config.fallback_model or "unused", - ) - - from codewiki.src.be.dependency_analyzer import DependencyGraphBuilder - graph_builder = DependencyGraphBuilder(backend_config) - components, leaf_nodes = graph_builder.build_dependency_graph() - - # Aggregate statistics - languages = {} - files = set() - for comp in components.values(): - lang = getattr(comp, "language", "unknown") - languages[lang] = languages.get(lang, 0) + 1 - files.add(getattr(comp, "relative_path", "")) - - result = { - "status": "success", - "repo_path": str(repo_path), - "total_components": len(components), - "total_files": len(files), - "leaf_nodes": len(leaf_nodes), - "languages": languages, - "sample_components": sorted(list(components.keys()))[:20], - } - return [TextContent(type="text", text=json.dumps(result, indent=2))] - - -async def _handle_get_module_tree(arguments: dict[str, Any]) -> list[TextContent]: - """Handle get_module_tree tool call — returns existing module tree.""" +async def _legacy_get_module_tree(arguments: dict[str, Any]) -> list[TextContent]: + """Legacy get_module_tree.""" repo_path = Path(arguments["repo_path"]).expanduser().resolve() output_dir = Path(arguments.get("output_dir", "docs")).expanduser().resolve() module_tree_path = output_dir / "module_tree.json" if not module_tree_path.exists(): - return [TextContent( - type="text", - text=f"Module tree not found at {module_tree_path}. Run 'codewiki generate' first." - )] + return [_text(json.dumps({ + "error": f"Module tree not found at {module_tree_path}. Run 'codewiki generate' first." + }))] module_tree = json.loads(module_tree_path.read_text()) def _summarize_tree(tree, depth=0): - """Create a readable summary of the module tree.""" lines = [] for name, info in tree.items(): indent = " " * depth @@ -299,13 +540,29 @@ def _summarize_tree(tree, depth=0): "total_modules": len(module_tree), "tree_summary": summary, } - return [TextContent(type="text", text=json.dumps(result, indent=2))] + return [_text(json.dumps(result, indent=2))] + + +# =================================================================== +# Helpers +# =================================================================== +def _text(content: str) -> TextContent: + return TextContent(type="text", text=content) + + +# =================================================================== +# Entry point +# =================================================================== async def main(): """Run the MCP server with stdio transport.""" async with stdio_server() as (read_stream, write_stream): - await server.run(read_stream, write_stream, server.create_initialization_options()) + await server.run( + read_stream, + write_stream, + server.create_initialization_options(), + ) if __name__ == "__main__": diff --git a/codewiki/mcp/session.py b/codewiki/mcp/session.py new file mode 100644 index 00000000..ca7d8f53 --- /dev/null +++ b/codewiki/mcp/session.py @@ -0,0 +1,92 @@ +"""Session state management for the CodeWiki MCP Server. + +Each ``analyze_repo`` call creates a new session that caches the analysis +results (components, leaf nodes, etc.) in memory. Subsequent tool calls +reference the session by ``session_id`` to read code, write docs, and +manage the module tree without re-parsing the repository. +""" + +from __future__ import annotations + +import time +import uuid +from dataclasses import dataclass, field +from pathlib import Path +from typing import Any, Dict, List, Optional + +from codewiki.src.be.dependency_analyzer.models.core import Node + + +# Sessions auto-expire after this many seconds of inactivity. +_SESSION_TTL_SECONDS = 2 * 60 * 60 # 2 hours + + +@dataclass +class SessionState: + """Mutable state shared across all MCP tool calls within a session.""" + + session_id: str + repo_path: str + output_dir: str + components: Dict[str, Node] + leaf_nodes: List[str] + module_tree: Dict[str, Any] = field(default_factory=dict) + registry: Dict[str, Any] = field(default_factory=dict) + created_at: float = field(default_factory=time.time) + last_accessed: float = field(default_factory=time.time) + + def touch(self) -> None: + """Update the last-accessed timestamp.""" + self.last_accessed = time.time() + + @property + def is_expired(self) -> bool: + return (time.time() - self.last_accessed) > _SESSION_TTL_SECONDS + + +class SessionStore: + """In-memory store for all active MCP sessions.""" + + def __init__(self) -> None: + self._sessions: Dict[str, SessionState] = {} + + def create( + self, + repo_path: str, + output_dir: str, + components: Dict[str, Node], + leaf_nodes: List[str], + ) -> SessionState: + """Create a new session and return it.""" + session_id = uuid.uuid4().hex[:12] + state = SessionState( + session_id=session_id, + repo_path=repo_path, + output_dir=output_dir, + components=components, + leaf_nodes=leaf_nodes, + ) + self._sessions[session_id] = state + self._purge_expired() + return state + + def get(self, session_id: str) -> Optional[SessionState]: + """Return the session or ``None`` if not found / expired.""" + state = self._sessions.get(session_id) + if state is None: + return None + if state.is_expired: + del self._sessions[session_id] + return None + state.touch() + return state + + def remove(self, session_id: str) -> bool: + """Remove a session. Returns True if it existed.""" + return self._sessions.pop(session_id, None) is not None + + def _purge_expired(self) -> None: + """Remove all expired sessions.""" + expired = [sid for sid, s in self._sessions.items() if s.is_expired] + for sid in expired: + del self._sessions[sid] diff --git a/codewiki/mcp/tools/__init__.py b/codewiki/mcp/tools/__init__.py new file mode 100644 index 00000000..62ea687d --- /dev/null +++ b/codewiki/mcp/tools/__init__.py @@ -0,0 +1,6 @@ +"""CodeWiki MCP Tools package. + +Each module in this package implements one or more MCP tools that operate +on a :class:`~codewiki.mcp.session.SessionState`. The tools are registered +by the MCP server in ``codewiki/mcp/server.py``. +""" diff --git a/codewiki/mcp/tools/analysis.py b/codewiki/mcp/tools/analysis.py new file mode 100644 index 00000000..41c8db90 --- /dev/null +++ b/codewiki/mcp/tools/analysis.py @@ -0,0 +1,112 @@ +"""MCP tool: analyze_repo — parse a repository and build the dependency graph. + +This is the entry-point tool for the IDE-driven wiki generation pipeline. +It runs CodeWiki's Tree-sitter-based dependency analyzer (no LLM needed), +caches the results in a new session, and returns a component index the IDE +agent can use for clustering and documentation. +""" + +from __future__ import annotations + +import json +import logging +import os +from pathlib import Path +from typing import Any, Dict, List, Tuple + +from codewiki.mcp.session import SessionState, SessionStore + +logger = logging.getLogger(__name__) + + +def _build_component_index(components: Dict[str, Any], max_items: int = 500) -> Tuple[list, bool]: + """Build a lightweight component index for the MCP response. + + Returns (index_list, truncated) where *truncated* is True when the + index was capped at *max_items*. + """ + index: list[dict] = [] + for comp_id, node in list(components.items())[:max_items]: + index.append({ + "id": comp_id, + "type": getattr(node, "component_type", "unknown"), + "file": getattr(node, "relative_path", ""), + "depends_on": list(getattr(node, "depends_on", []))[:20], + }) + return index, len(components) > max_items + + +def handle_analyze_repo( + arguments: Dict[str, Any], + store: SessionStore, +) -> str: + """Run the dependency analysis and return the session + component index.""" + repo_path = Path(arguments["repo_path"]).expanduser().resolve() + if not repo_path.exists(): + return json.dumps({"error": f"Repository not found: {repo_path}"}) + + output_dir = Path(arguments.get("output_dir", str(repo_path / "docs"))).expanduser().resolve() + output_dir.mkdir(parents=True, exist_ok=True) + + # Build a minimal Config for the dependency analyzer (no LLM fields used) + from codewiki.src.config import Config + config = Config( + repo_path=str(repo_path), + output_dir=str(output_dir / "temp"), + dependency_graph_dir=str(output_dir / "temp" / "dependency_graphs"), + docs_dir=str(output_dir), + max_depth=2, + llm_base_url="not-needed", + llm_api_key="not-needed", + main_model="unused", + cluster_model="unused", + ) + + # Apply optional include/exclude patterns + include = arguments.get("include_patterns") + exclude = arguments.get("exclude_patterns") + if include or exclude: + agent_instructions: Dict[str, Any] = {} + if include: + agent_instructions["include_patterns"] = [p.strip() for p in include.split(",")] + if exclude: + agent_instructions["exclude_patterns"] = [p.strip() for p in exclude.split(",")] + config.agent_instructions = agent_instructions + + from codewiki.src.be.dependency_analyzer import DependencyGraphBuilder + builder = DependencyGraphBuilder(config) + components, leaf_nodes = builder.build_dependency_graph() + + session = store.create( + repo_path=str(repo_path), + output_dir=str(output_dir), + components=components, + leaf_nodes=leaf_nodes, + ) + + index, truncated = _build_component_index(components) + + # Language stats + languages: Dict[str, int] = {} + for node in components.values(): + lang = getattr(node, "language", "unknown") + languages[lang] = languages.get(lang, 0) + 1 + + result = { + "session_id": session.session_id, + "repo_name": repo_path.name, + "repo_path": str(repo_path), + "output_dir": str(output_dir), + "languages": languages, + "total_components": len(components), + "total_leaf_nodes": len(leaf_nodes), + "leaf_nodes": leaf_nodes[:100], + "component_index": index, + "component_index_truncated": truncated, + "hint": ( + "Use read_code_components(session_id, component_ids) to read source code. " + "Use save_module_tree(session_id, module_tree) after clustering. " + "Call get_prompt('cluster') for clustering rules." + ), + } + return json.dumps(result, indent=2, ensure_ascii=False) diff --git a/codewiki/mcp/tools/code_reader.py b/codewiki/mcp/tools/code_reader.py new file mode 100644 index 00000000..5bce49bd --- /dev/null +++ b/codewiki/mcp/tools/code_reader.py @@ -0,0 +1,107 @@ +"""MCP tools: read_code_components + view_repo_file. + +These are read-only tools that let the IDE agent explore source code +within the analyzed repository. +""" + +from __future__ import annotations + +import json +import logging +import os +import subprocess +from pathlib import Path +from typing import Any, Dict, List, Optional + +from codewiki.mcp.session import SessionState, SessionStore + +logger = logging.getLogger(__name__) + +# Truncation guard for very large responses +_MAX_RESPONSE_LEN = 32000 + + +def _maybe_truncate(text: str, limit: int = _MAX_RESPONSE_LEN) -> str: + if len(text) <= limit: + return text + return text[:limit] + "\n\n" + + +def handle_read_code_components( + arguments: Dict[str, Any], + store: SessionStore, +) -> str: + """Return the source code for a list of component IDs.""" + session_id = arguments["session_id"] + session = store.get(session_id) + if session is None: + return json.dumps({"error": f"Session {session_id} not found or expired."}) + + component_ids: List[str] = arguments["component_ids"] + components = session.components + + results = [] + for cid in component_ids: + node = components.get(cid) + if node is None: + results.append(f"# Component {cid} not found\n") + else: + lang = getattr(node, "language", "") + fence = lang if lang else "" + code = getattr(node, "source_code", "").strip() + results.append(f"## {cid} ({getattr(node, 'component_type', '')})\n```{fence}\n{code}\n```\n") + + output = "\n".join(results) + return _maybe_truncate(output) + + +def handle_view_repo_file( + arguments: Dict[str, Any], + store: SessionStore, +) -> str: + """Read-only view of a file or directory inside the repository.""" + session_id = arguments["session_id"] + session = store.get(session_id) + if session is None: + return json.dumps({"error": f"Session {session_id} not found or expired."}) + + rel_path = arguments["path"] + abs_path = Path(session.repo_path) / rel_path + + if not abs_path.exists(): + return json.dumps({"error": f"Path not found: {rel_path}"}) + + # Directory listing + if abs_path.is_dir(): + out = subprocess.run( + rf"find {abs_path} -maxdepth 2 -not -path '*/\.*'", + shell=True, + capture_output=True, + ) + listing = out.stdout.decode("utf-8", errors="replace") + listing = listing.replace(str(abs_path), rel_path) + return f"Directory listing for {rel_path}:\n{listing}" + + # File view + try: + content = abs_path.read_text(encoding="utf-8", errors="replace") + except Exception as e: + return json.dumps({"error": f"Cannot read file: {e}"}) + + view_range = arguments.get("view_range") + lines = content.split("\n") + + if view_range: + if len(view_range) != 2: + return json.dumps({"error": "view_range must be [start, end]"}) + start, end = view_range + start = max(1, min(start, len(lines))) + if end == -1: + end = len(lines) + end = max(start, min(end, len(lines))) + selected = lines[start - 1 : end] + numbered = "\n".join(f"{i + start:6}\t{line}" for i, line in enumerate(selected)) + return f"File: {rel_path} (lines {start}-{end})\n{numbered}" + + numbered = "\n".join(f"{i + 1:6}\t{line}" for i, line in enumerate(lines)) + return _maybe_truncate(f"File: {rel_path} ({len(lines)} lines)\n{numbered}") diff --git a/codewiki/mcp/tools/doc_writer.py b/codewiki/mcp/tools/doc_writer.py new file mode 100644 index 00000000..ce5f35db --- /dev/null +++ b/codewiki/mcp/tools/doc_writer.py @@ -0,0 +1,167 @@ +"""MCP tools: write_doc_file + edit_doc_file. + +These tools create and edit markdown documentation files in the output +directory, with automatic Mermaid diagram validation after every write. +""" + +from __future__ import annotations + +import asyncio +import json +import logging +import os +from pathlib import Path +from typing import Any, Dict, Optional + +from codewiki.mcp.session import SessionState, SessionStore + +logger = logging.getLogger(__name__) + + +async def _validate_mermaid(file_path: str, relative_path: str) -> str: + """Run Mermaid validation and return the result string.""" + try: + from codewiki.src.be.utils import validate_mermaid_diagrams + return await validate_mermaid_diagrams(file_path, relative_path) + except Exception as e: + return f"Mermaid validation skipped: {e}" + + +def _ensure_parent_dirs(path: Path) -> None: + """Create parent directories if they don't exist.""" + path.parent.mkdir(parents=True, exist_ok=True) + + +async def handle_write_doc_file( + arguments: Dict[str, Any], + store: SessionStore, +) -> str: + """Create a new documentation file in the output directory.""" + session_id = arguments["session_id"] + session = store.get(session_id) + if session is None: + return json.dumps({"error": f"Session {session_id} not found or expired."}) + + filename = arguments["filename"] + if not filename.endswith(".md"): + filename += ".md" + content = arguments["content"] + + doc_path = Path(session.output_dir) / filename + _ensure_parent_dirs(doc_path) + + if doc_path.exists(): + return json.dumps({ + "error": f"File already exists: {filename}. Use edit_doc_file to modify it." + }) + + doc_path.write_text(content, encoding="utf-8") + + # Mermaid validation + mermaid_result = await _validate_mermaid(str(doc_path), filename) + + result = { + "status": "created", + "path": str(doc_path), + "filename": filename, + "lines": content.count("\n") + 1, + "mermaid_validation": mermaid_result, + } + return json.dumps(result, indent=2, ensure_ascii=False) + + +async def handle_edit_doc_file( + arguments: Dict[str, Any], + store: SessionStore, +) -> str: + """Edit an existing documentation file (str_replace, insert, or undo).""" + session_id = arguments["session_id"] + session = store.get(session_id) + if session is None: + return json.dumps({"error": f"Session {session_id} not found or expired."}) + + filename = arguments["filename"] + if not filename.endswith(".md"): + filename += ".md" + + doc_path = Path(session.output_dir) / filename + command = arguments["command"] + + if command == "undo": + # Undo via registry history + history_key = str(doc_path) + history = session.registry.get("file_history", "{}") + file_history = json.loads(history) if isinstance(history, str) else history + path_history = file_history.get(history_key, []) + if not path_history: + return json.dumps({"error": f"No edit history found for {filename}."}) + old_content = path_history.pop() + file_history[history_key] = path_history + session.registry["file_history"] = json.dumps(file_history) + doc_path.write_text(old_content, encoding="utf-8") + return json.dumps({"status": "undone", "filename": filename}) + + if not doc_path.exists(): + return json.dumps({"error": f"File not found: {filename}. Use write_doc_file to create it."}) + + # Save current content to history before editing + current_content = doc_path.read_text(encoding="utf-8") + history_key = str(doc_path) + history = session.registry.get("file_history", "{}") + file_history = json.loads(history) if isinstance(history, str) else history + file_history.setdefault(history_key, []).append(current_content) + session.registry["file_history"] = json.dumps(file_history) + + if command == "str_replace": + old_str = arguments.get("old_str") + new_str = arguments.get("new_str", "") + if old_str is None: + return json.dumps({"error": "old_str is required for str_replace."}) + + occurrences = current_content.count(old_str) + if occurrences == 0: + return json.dumps({"error": f"old_str not found in {filename}."}) + if occurrences > 1: + return json.dumps({"error": f"old_str appears {occurrences} times in {filename}. Make it unique."}) + + new_content = current_content.replace(old_str, new_str, 1) + doc_path.write_text(new_content, encoding="utf-8") + + # Snippet around the edit + replacement_line = current_content.split(old_str)[0].count("\n") + lines = new_content.split("\n") + start = max(0, replacement_line - 4) + end = min(len(lines), replacement_line + new_str.count("\n") + 5) + snippet = "\n".join(f"{i + start + 1:6}\t{lines[i]}" for i in range(start, end)) + + elif command == "insert": + insert_line = arguments.get("insert_line", 0) + new_str = arguments.get("new_str", "") + if not new_str: + return json.dumps({"error": "new_str is required for insert."}) + + lines = current_content.split("\n") + insert_line = max(0, min(insert_line, len(lines))) + new_str_lines = new_str.split("\n") + lines = lines[:insert_line] + new_str_lines + lines[insert_line:] + new_content = "\n".join(lines) + doc_path.write_text(new_content, encoding="utf-8") + + start = max(0, insert_line - 4) + end = min(len(lines), insert_line + len(new_str_lines) + 4) + snippet = "\n".join(f"{i + start + 1:6}\t{lines[i]}" for i in range(start, end)) + + else: + return json.dumps({"error": f"Unknown command: {command}. Use str_replace, insert, or undo."}) + + # Mermaid validation + mermaid_result = await _validate_mermaid(str(doc_path), filename) + + result = { + "status": "edited", + "command": command, + "filename": filename, + "snippet": snippet, + "mermaid_validation": mermaid_result, + } + return json.dumps(result, indent=2, ensure_ascii=False) diff --git a/codewiki/mcp/tools/module_tree.py b/codewiki/mcp/tools/module_tree.py new file mode 100644 index 00000000..8d7fa3a1 --- /dev/null +++ b/codewiki/mcp/tools/module_tree.py @@ -0,0 +1,133 @@ +"""MCP tools: save_module_tree + get_processing_order. + +The IDE agent decides how to group components into modules (clustering) +using its own LLM. These tools persist that decision and compute the +leaf-first processing order for documentation generation. +""" + +from __future__ import annotations + +import json +import logging +import os +from pathlib import Path +from typing import Any, Dict, List, Tuple + +from codewiki.mcp.session import SessionState, SessionStore +from codewiki.src.config import FIRST_MODULE_TREE_FILENAME, MODULE_TREE_FILENAME + +logger = logging.getLogger(__name__) + + +def _get_processing_order(module_tree: Dict[str, Any], parent_path: List[str] = []) -> List[Dict[str, Any]]: + """Compute leaf-first processing order from a module tree. + + Returns a list of dicts with module path, name, leaf status, and + component/children info. + """ + order: List[Dict[str, Any]] = [] + + def _collect(tree: Dict[str, Any], path: List[str]) -> None: + for module_name, module_info in tree.items(): + current_path = path + [module_name] + children = module_info.get("children", {}) + has_children = isinstance(children, dict) and len(children) > 0 + + if has_children: + _collect(children, current_path) + order.append({ + "module": module_name, + "path": current_path, + "is_leaf": False, + "children": list(children.keys()), + "components": module_info.get("components", []), + }) + else: + order.append({ + "module": module_name, + "path": current_path, + "is_leaf": True, + "components": module_info.get("components", []), + }) + + _collect(module_tree, parent_path) + return order + + +def handle_save_module_tree( + arguments: Dict[str, Any], + store: SessionStore, +) -> str: + """Persist the IDE agent's clustering result as the module tree.""" + session_id = arguments["session_id"] + session = store.get(session_id) + if session is None: + return json.dumps({"error": f"Session {session_id} not found or expired."}) + + module_tree = arguments["module_tree"] + output_dir = session.output_dir + + # Save both immutable snapshot and mutable working copy + first_path = os.path.join(output_dir, FIRST_MODULE_TREE_FILENAME) + working_path = os.path.join(output_dir, MODULE_TREE_FILENAME) + + os.makedirs(output_dir, exist_ok=True) + + with open(first_path, "w", encoding="utf-8") as f: + json.dump(module_tree, f, indent=2, ensure_ascii=False) + with open(working_path, "w", encoding="utf-8") as f: + json.dump(module_tree, f, indent=2, ensure_ascii=False) + + # Cache in session + session.module_tree = module_tree + + # Compute processing order + order = _get_processing_order(module_tree) + + result = { + "status": "saved", + "module_count": len(module_tree), + "processing_order": order, + "tree_path": working_path, + "first_tree_path": first_path, + "hint": ( + "Use get_processing_order(session_id) to retrieve this order again. " + "Process leaf modules first (is_leaf=true), then parent modules. " + "For each leaf module: get_prompt('system_leaf') + read_code_components + write_doc_file. " + "For each parent module: get_prompt('overview_module') + write_doc_file." + ), + } + return json.dumps(result, indent=2, ensure_ascii=False) + + +def handle_get_processing_order( + arguments: Dict[str, Any], + store: SessionStore, +) -> str: + """Return the leaf-first processing order for the saved module tree.""" + session_id = arguments["session_id"] + session = store.get(session_id) + if session is None: + return json.dumps({"error": f"Session {session_id} not found or expired."}) + + # Try session cache first, then disk + module_tree = session.module_tree + if not module_tree: + tree_path = os.path.join(session.output_dir, MODULE_TREE_FILENAME) + if os.path.exists(tree_path): + with open(tree_path, encoding="utf-8") as f: + module_tree = json.load(f) + session.module_tree = module_tree + else: + return json.dumps({ + "error": "Module tree not found. Call save_module_tree first." + }) + + order = _get_processing_order(module_tree) + + result = { + "session_id": session_id, + "module_count": len(module_tree), + "order": order, + } + return json.dumps(result, indent=2, ensure_ascii=False) diff --git a/codewiki/mcp/tools/prompt_server.py b/codewiki/mcp/tools/prompt_server.py new file mode 100644 index 00000000..04fe2347 --- /dev/null +++ b/codewiki/mcp/tools/prompt_server.py @@ -0,0 +1,176 @@ +"""MCP tool: get_prompt — serve CodeWiki's prompt templates to the IDE agent. + +CodeWiki ships with carefully designed prompt templates for each stage of +the wiki generation pipeline. This tool lets the IDE agent retrieve them +(with optional variable substitution) so it can follow the same proven +methodology without needing its own copy of the prompts. +""" + +from __future__ import annotations + +import json +import logging +from typing import Any, Dict, Optional + +from codewiki.mcp.session import SessionStore +from codewiki.src.be.prompt_template import ( + CLUSTER_REPO_PROMPT, + CLUSTER_MODULE_PROMPT, + SYSTEM_PROMPT, + LEAF_SYSTEM_PROMPT, + USER_PROMPT, + REPO_OVERVIEW_PROMPT, + MODULE_OVERVIEW_PROMPT, + format_system_prompt, + format_leaf_system_prompt, + format_cluster_prompt, + format_user_prompt, +) + +logger = logging.getLogger(__name__) + + +# Prompt catalog: maps prompt_type to (raw_template, usage_hint, variables_doc) +_PROMPT_CATALOG: Dict[str, Dict[str, str]] = { + "cluster": { + "description": "Prompt for grouping components into modules. The LLM receives a component list and returns a JSON module tree.", + "usage_hint": ( + "Use this prompt to cluster components into logical modules. " + "The response should contain JSON. " + "Pass the component list from analyze_repo's component_index." + ), + }, + "system_complex": { + "description": "System prompt for documenting a complex (multi-file, parent) module. Includes sub-module delegation instructions.", + "usage_hint": ( + "Use as the system prompt when generating docs for a parent module. " + "The agent should create {module_name}.md with architecture overview " + "and cross-references to sub-module docs." + ), + }, + "system_leaf": { + "description": "System prompt for documenting a leaf (single-file or simple) module.", + "usage_hint": ( + "Use as the system prompt when generating docs for a leaf module. " + "The agent should create {module_name}.md with detailed documentation " + "including Mermaid diagrams." + ), + }, + "user": { + "description": "User prompt template that provides the module tree and core component source code.", + "usage_hint": ( + "Use as the user/assistant prompt alongside system_leaf or system_complex. " + "It provides the module tree context and the actual source code of core components." + ), + }, + "overview_module": { + "description": "Prompt for generating a parent module overview from its children's documentation.", + "usage_hint": ( + "Use this after all child modules are documented. " + "Provide the module tree with children's docs embedded. " + "The response should be wrapped in tags." + ), + }, + "overview_repo": { + "description": "Prompt for generating the final repository overview.", + "usage_hint": ( + "Use this as the LAST step after all modules are documented. " + "Provide the full module tree with child docs. " + "Save the result as overview.md." + ), + }, +} + + +def handle_get_prompt( + arguments: Dict[str, Any], + store: SessionStore, +) -> str: + """Return a prompt template, optionally with variables filled in.""" + prompt_type = arguments["prompt_type"] + variables = arguments.get("variables", {}) + + if prompt_type not in _PROMPT_CATALOG: + available = list(_PROMPT_CATALOG.keys()) + return json.dumps({ + "error": f"Unknown prompt_type: {prompt_type}", + "available_types": available, + }) + + catalog_entry = _PROMPT_CATALOG[prompt_type] + + # Resolve the prompt content + content = _resolve_prompt(prompt_type, variables) + + result = { + "prompt_type": prompt_type, + "description": catalog_entry["description"], + "usage_hint": catalog_entry["usage_hint"], + "content": content, + } + return json.dumps(result, indent=2, ensure_ascii=False) + + +def _resolve_prompt(prompt_type: str, variables: Dict[str, Any]) -> str: + """Resolve a prompt template with optional variable substitution.""" + + if prompt_type == "cluster": + potential_core_components = variables.get("potential_core_components", "") + module_tree = variables.get("module_tree", {}) + module_name = variables.get("module_name", None) + return format_cluster_prompt( + potential_core_components=potential_core_components, + module_tree=module_tree, + module_name=module_name, + ) + + elif prompt_type == "system_complex": + module_name = variables.get("module_name", "MODULE_NAME") + custom_instructions = variables.get("custom_instructions", None) + return format_system_prompt(module_name, custom_instructions) + + elif prompt_type == "system_leaf": + module_name = variables.get("module_name", "MODULE_NAME") + custom_instructions = variables.get("custom_instructions", None) + return format_leaf_system_prompt(module_name, custom_instructions) + + elif prompt_type == "user": + # If full variables are provided, use the full formatter + session_id = variables.get("session_id") + module_name = variables.get("module_name", "MODULE_NAME") + core_component_ids = variables.get("core_component_ids", []) + module_tree = variables.get("module_tree", {}) + + if session_id and core_component_ids: + # Try to resolve from session + from codewiki.mcp.session import SessionStore + # We can't easily access the store here, so fall back to template + pass + + # Return the template with placeholders filled as possible + return USER_PROMPT.format( + module_name=module_name, + module_tree=json.dumps(module_tree, indent=2) if module_tree else "", + formatted_core_component_codes=variables.get( + "formatted_core_component_codes", + "" + ), + ) + + elif prompt_type == "overview_module": + module_name = variables.get("module_name", "MODULE_NAME") + repo_structure = variables.get("repo_structure", "") + return MODULE_OVERVIEW_PROMPT.format( + module_name=module_name, + repo_structure=repo_structure if isinstance(repo_structure, str) else json.dumps(repo_structure, indent=4), + ) + + elif prompt_type == "overview_repo": + repo_name = variables.get("repo_name", "REPO_NAME") + repo_structure = variables.get("repo_structure", "") + return REPO_OVERVIEW_PROMPT.format( + repo_name=repo_name, + repo_structure=repo_structure if isinstance(repo_structure, str) else json.dumps(repo_structure, indent=4), + ) + + return f"Unknown prompt type: {prompt_type}" From 7213a09d5e2239b9d46675776c2afedc394143ba Mon Sep 17 00:00:00 2001 From: mambo-wang Date: Sat, 20 Jun 2026 21:05:56 +0800 Subject: [PATCH 2/5] fix: add missing MCP packages to pyproject.toml --- pyproject.toml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index b618d572..e4126004 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -101,7 +101,9 @@ packages = [ "codewiki.src.be.dependency_analyzer.analyzers", "codewiki.src.be.dependency_analyzer.models", "codewiki.src.be.dependency_analyzer.utils", - "codewiki.src.fe" + "codewiki.src.fe", + "codewiki.mcp", + "codewiki.mcp.tools" ] [tool.setuptools.package-data] From bfbda2e2563ac8ed738510bbc48aa539e9699a53 Mon Sep 17 00:00:00 2001 From: mambo-wang Date: Fri, 19 Jun 2026 13:24:34 +0800 Subject: [PATCH 3/5] feat: add incremental update support to MCP analyze_repo - Add _detect_changes() with git diff + mtime dual-strategy detection - Add _find_affected_modules() to map changed files to affected modules - analyze_repo now returns a 'changes' field with affected/cascade modules - Decouple codewiki/__init__.py from CLI imports for lightweight MCP startup - Update skill and IDE_DRIVEN_GUIDE.md with incremental update docs --- codewiki/__init__.py | 7 +- codewiki/mcp/server.py | 6 +- codewiki/mcp/tools/analysis.py | 214 ++++++++++++++++++++++++++++++++- 3 files changed, 221 insertions(+), 6 deletions(-) diff --git a/codewiki/__init__.py b/codewiki/__init__.py index 77f63b9a..e078a354 100644 --- a/codewiki/__init__.py +++ b/codewiki/__init__.py @@ -1,14 +1,13 @@ """ CodeWiki: Transform codebases into comprehensive documentation using AI-powered analysis. -This package provides a CLI tool for generating documentation from code repositories. +This package provides a CLI tool for generating documentation from code repositories, +and an MCP server for IDE-driven documentation generation. """ __version__ = "1.0.1" __author__ = "CodeWiki Contributors" __license__ = "MIT" -from codewiki.cli.main import cli - -__all__ = ["cli", "__version__"] +__all__ = ["__version__"] diff --git a/codewiki/mcp/server.py b/codewiki/mcp/server.py index 20d6abb3..cc7c6ac4 100644 --- a/codewiki/mcp/server.py +++ b/codewiki/mcp/server.py @@ -72,7 +72,11 @@ def _fine_grained_tools() -> list[Tool]: "using Tree-sitter AST parsing. Returns a component index and leaf nodes. " "No LLM required. This is the entry point for the wiki generation pipeline. " "After calling this, use get_prompt('cluster') to learn clustering rules, " - "then save_module_tree to persist your grouping." + "then save_module_tree to persist your grouping. " + "INCREMENTAL UPDATE: If docs already exist in output_dir (metadata.json + " + "module_tree.json), the response includes a 'changes' field showing which " + "files changed and which modules need updating. Use this to do targeted " + "edits instead of regenerating everything." ), inputSchema={ "type": "object", diff --git a/codewiki/mcp/tools/analysis.py b/codewiki/mcp/tools/analysis.py index 41c8db90..4e29b516 100644 --- a/codewiki/mcp/tools/analysis.py +++ b/codewiki/mcp/tools/analysis.py @@ -11,8 +11,9 @@ import json import logging import os +import time from pathlib import Path -from typing import Any, Dict, List, Tuple +from typing import Any, Dict, List, Optional, Tuple from codewiki.mcp.session import SessionState, SessionStore @@ -36,6 +37,207 @@ def _build_component_index(components: Dict[str, Any], max_items: int = 500) -> return index, len(components) > max_items +# --------------------------------------------------------------------------- +# Incremental update: detect changes since last generation +# --------------------------------------------------------------------------- + +def _detect_changes( + repo_path: Path, + output_dir: Path, +) -> Optional[Dict[str, Any]]: + """Detect changes since last documentation generation. + + Returns a changes dict with affected modules, or None if no previous + generation exists (first run). + + Detection strategy: + 1. Git-based: compare stored commit_id with current HEAD, plus check + uncommitted changes via ``git status``. + 2. Fallback: compare file mtime with stored ``timestamp`` in metadata. + """ + metadata_path = output_dir / "metadata.json" + module_tree_path = output_dir / "module_tree.json" + + if not metadata_path.exists() or not module_tree_path.exists(): + return None + + try: + metadata = json.loads(metadata_path.read_text()) + module_tree = json.loads(module_tree_path.read_text()) + except (json.JSONDecodeError, OSError): + return None + + # Try git-based detection first + changes = _detect_via_git(repo_path, metadata) + + # Fallback to mtime-based detection + if changes is None: + changes = _detect_via_mtime(repo_path, metadata) + + if changes is None: + return None + + changed_files = changes["changed_files"] + if not changed_files: + return { + "has_previous": True, + "no_changes": True, + "method": changes.get("method", "unknown"), + "message": "No changes detected since last generation. Documentation is up to date.", + } + + affected, cascade = _find_affected_modules(module_tree, changed_files) + + return { + "has_previous": True, + "no_changes": False, + "method": changes.get("method", "unknown"), + "changed_files": changed_files[:50], + "affected_modules": sorted(affected), + "cascade_modules": sorted(cascade), + "hint": ( + f"Only {len(affected)} module(s) need updating: {sorted(affected)}. " + f"Parent modules to refresh: {sorted(cascade)}. " + "Use edit_doc_file for targeted updates, write_doc_file for new modules." + ), + } + + +def _detect_via_git( + repo_path: Path, + metadata: Dict[str, Any], +) -> Optional[Dict[str, Any]]: + """Detect changes via git. Returns None if not in a git repo. + + Checks both committed changes (diff against stored commit_id) and + uncommitted changes (``git status``). + """ + try: + import git + repo = git.Repo(repo_path, search_parent_directories=True) + except Exception: + return None + + prev_commit = metadata.get("generation_info", {}).get("commit_id") + try: + current_commit = repo.head.commit.hexsha + except Exception: + return None + + changed: list[str] = [] + method = "git" + + # 1) Committed changes since last generation + if prev_commit and prev_commit != current_commit: + try: + diff_index = repo.commit(prev_commit).diff(current_commit) + seen: set[str] = set() + for diff in diff_index: + if diff.a_path and diff.a_path not in seen: + changed.append(diff.a_path) + seen.add(diff.a_path) + if diff.b_path and diff.b_path not in seen: + changed.append(diff.b_path) + seen.add(diff.b_path) + except Exception: + pass + + # 2) Uncommitted changes (user may have edited but not committed) + try: + for item in repo.untracked_files: + if item not in changed: + changed.append(item) + for file_path in [d.a_path for d in repo.index.diff(None)]: + if file_path and file_path not in changed: + changed.append(file_path) + except Exception: + pass + + return {"changed_files": changed, "method": method} + + +def _detect_via_mtime( + repo_path: Path, + metadata: Dict[str, Any], +) -> Optional[Dict[str, Any]]: + """Fallback: detect changed files by comparing mtime with generation timestamp.""" + timestamp_str = metadata.get("generation_info", {}).get("timestamp") + if not timestamp_str: + return None + + try: + from datetime import datetime + prev_time = datetime.fromisoformat(timestamp_str).timestamp() + except (ValueError, TypeError): + return None + + # Language extensions recognized by CodeWiki + source_extensions = { + ".py", ".java", ".js", ".jsx", ".ts", ".tsx", + ".c", ".h", ".cpp", ".hpp", ".cc", ".hh", + ".cs", ".kt", ".kts", + } + + changed: list[str] = [] + for dirpath, dirnames, filenames in os.walk(repo_path): + # Skip hidden dirs and common non-source dirs + dirnames[:] = [ + d for d in dirnames + if not d.startswith(".") and d not in ("node_modules", "__pycache__", "venv", ".venv") + ] + for filename in filenames: + filepath = Path(dirpath) / filename + if filepath.suffix.lower() not in source_extensions: + continue + try: + if filepath.stat().st_mtime > prev_time: + rel_path = str(filepath.relative_to(repo_path)) + changed.append(rel_path) + except OSError: + continue + + return {"changed_files": changed, "method": "mtime"} + + +def _find_affected_modules( + module_tree: Dict[str, Any], + changed_files: List[str], +) -> Tuple[set, set]: + """Map changed files to affected modules using module_tree.json. + + Uses substring matching (same as the CLI ``_invalidate_affected_modules``). + Returns (affected_modules, cascade_parent_modules). + """ + affected: set[str] = set() + cascade: set[str] = set() + + def _walk(tree: Dict, parents: list[str] | None = None): + if parents is None: + parents = [] + for mod_name, mod_info in tree.items(): + components = mod_info.get("components", []) + hit = False + for comp in components: + if any(cf in comp or comp in cf for cf in changed_files): + hit = True + break + if hit: + affected.add(mod_name) + cascade.update(parents) + + children = mod_info.get("children", {}) + if isinstance(children, dict) and children: + _walk(children, parents + [mod_name]) + + _walk(module_tree) + + # overview.md depends on all child docs, always refresh if anything changed + if affected: + cascade.add("overview") + + return affected, cascade + + def handle_analyze_repo( arguments: Dict[str, Any], store: SessionStore, @@ -92,6 +294,9 @@ def handle_analyze_repo( lang = getattr(node, "language", "unknown") languages[lang] = languages.get(lang, 0) + 1 + # Incremental update: detect changes since last generation + changes = _detect_changes(repo_path, output_dir) + result = { "session_id": session.session_id, "repo_name": repo_path.name, @@ -103,10 +308,17 @@ def handle_analyze_repo( "leaf_nodes": leaf_nodes[:100], "component_index": index, "component_index_truncated": truncated, + "changes": changes, "hint": ( "Use read_code_components(session_id, component_ids) to read source code. " "Use save_module_tree(session_id, module_tree) after clustering. " "Call get_prompt('cluster') for clustering rules." ), } + if changes and not changes.get("no_changes"): + result["hint"] = ( + "Incremental update detected. Only update affected modules listed in " + "'changes.affected_modules'. Use edit_doc_file for targeted updates. " + "Refresh cascade parent modules in 'changes.cascade_modules'." + ) return json.dumps(result, indent=2, ensure_ascii=False) From fe694a5c4c7ab4aa58793aeae5eb8954dd6959e6 Mon Sep 17 00:00:00 2001 From: mambo-wang Date: Fri, 19 Jun 2026 16:13:54 +0800 Subject: [PATCH 4/5] fix: pass commit_id to metadata.json in CLI mode for --update support Previously, CLIDocumentationGenerator never received or forwarded the git commit SHA, so metadata.json always had commit_id: null. This made --update fall back to full regeneration every time. Now the commit hash is obtained before generator creation and threaded through to the backend DocumentationGenerator, matching the behavior already present in Web mode (background_worker.py). --- codewiki/cli/adapters/doc_generator.py | 7 +++++-- codewiki/cli/commands/generate.py | 6 ++++-- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/codewiki/cli/adapters/doc_generator.py b/codewiki/cli/adapters/doc_generator.py index 61b1c1b7..cbda31ca 100644 --- a/codewiki/cli/adapters/doc_generator.py +++ b/codewiki/cli/adapters/doc_generator.py @@ -37,7 +37,8 @@ def __init__( output_dir: Path, config: Dict[str, Any], verbose: bool = False, - generate_html: bool = False + generate_html: bool = False, + commit_id: str = None, ): """ Initialize the CLI documentation generator. @@ -48,12 +49,14 @@ def __init__( config: LLM configuration verbose: Enable verbose output generate_html: Whether to generate HTML viewer + commit_id: Git commit SHA for incremental update tracking """ self.repo_path = repo_path self.output_dir = output_dir self.config = config self.verbose = verbose self.generate_html = generate_html + self.commit_id = commit_id self.progress_tracker = ProgressTracker(total_stages=5, verbose=verbose) self.job = DocumentationJob() @@ -178,7 +181,7 @@ async def _run_backend_generation(self, backend_config: BackendConfig): self.progress_tracker.update_stage(0.2, "Initializing dependency analyzer...") # Create documentation generator - doc_generator = DocumentationGenerator(backend_config) + doc_generator = DocumentationGenerator(backend_config, commit_id=self.commit_id) if self.verbose: self.progress_tracker.update_stage(0.5, "Parsing source files...") diff --git a/codewiki/cli/commands/generate.py b/codewiki/cli/commands/generate.py index 1c370cb8..d8c9afe8 100644 --- a/codewiki/cli/commands/generate.py +++ b/codewiki/cli/commands/generate.py @@ -525,6 +525,8 @@ def generate_command( agent_instructions_dict = config.agent_instructions.to_dict() # Create generator + # Get commit_id early so it can be stored in metadata.json for --update support + commit_id = get_git_commit_hash(repo_path) generator = CLIDocumentationGenerator( repo_path=repo_path, output_dir=output_dir, @@ -545,7 +547,8 @@ def generate_command( 'max_depth': max_depth if max_depth is not None else config.max_depth, }, verbose=verbose, - generate_html=github_pages + generate_html=github_pages, + commit_id=commit_id, ) # Run generation @@ -556,7 +559,6 @@ def generate_command( # Get repository info repo_url = None - commit_hash = get_git_commit_hash(repo_path) current_branch = get_git_branch(repo_path) if is_git_repository(repo_path): From 129f730cb7a7db686f22fdaa667dc4502f5def33 Mon Sep 17 00:00:00 2001 From: mambo-wang Date: Sat, 20 Jun 2026 21:14:16 +0800 Subject: [PATCH 5/5] SKILL --- skill/codewiki-wiki-generator/SKILL.md | 174 +++++++++++++++++++++++++ 1 file changed, 174 insertions(+) create mode 100644 skill/codewiki-wiki-generator/SKILL.md diff --git a/skill/codewiki-wiki-generator/SKILL.md b/skill/codewiki-wiki-generator/SKILL.md new file mode 100644 index 00000000..4a8ac2ff --- /dev/null +++ b/skill/codewiki-wiki-generator/SKILL.md @@ -0,0 +1,174 @@ +--- +name: codewiki-wiki-generator +description: "使用 CodeWiki-CN MCP 工具为代码仓库生成 Wiki 文档。当用户要求生成 Wiki、代码文档、仓库文档或分析代码库结构时使用此技能。需要已配置 CodeWiki-CN MCP 服务器。" +version: 1.0.0 +--- + +# CodeWiki 文档生成器 + +你是一位代码文档生成专家。使用 CodeWiki-CN 的 MCP 工具为代码仓库生成全面的 Wiki 文档。所有 9 个工具均**无需配置 LLM**——你提供全部智能推理能力,CodeWiki 提供工具链。 + +## 前置条件 + +开始前,确认 CodeWiki MCP 服务器可用。MCP 工具列表中应包含以下 9 个工具:`analyze_repo`、`read_code_components`、`view_repo_file`、`write_doc_file`、`edit_doc_file`、`save_module_tree`、`get_processing_order`、`get_prompt`、`close_session`。 + +如果工具不可用,请提示用户安装并配置 CodeWiki-CN: + +```bash +git clone https://github.com/mambo-wang/CodeWiki-CN.git +cd CodeWiki-CN && pip install -e . +``` + +然后在 MCP 配置中添加: + +```json +{"mcpServers":{"codewiki":{"command":"python","args":["-m","codewiki.mcp.server"],"cwd":"/path/to/CodeWiki-CN"}}} +``` + +## 五阶段工作流程 + +严格按以下顺序执行。阶段 1 之后的所有工具调用都需要 `analyze_repo` 返回的 `session_id`。 + +### 阶段 1:分析仓库 + +调用 `analyze_repo`: + +```json +{ "repo_path": "<仓库绝对路径>", "output_dir": "<仓库路径>/repowiki" } +``` + +返回内容:`session_id`、`component_index`(组件列表,含 id/type/file/depends_on)、`leaf_nodes`、`languages`。 + +**牢记 `session_id`**——后续每一步都需要它。 + +### 阶段 2:模块聚类 + +这是最需要理解力的阶段。你需要将组件分组为逻辑模块。 + +1. **获取聚类规则**:调用 `get_prompt`,参数 `{"prompt_type": "cluster"}` +2. **阅读源码**(组件超过 50 个时):分批调用 `read_code_components`,每批 15-20 个叶节点 ID,理解各组件的功能和关联 +3. **按以下原则分组**: + - 功能内聚:关系紧密的组件放入同一模块 + - 文件归属:同一文件/目录下的组件倾向归入同一模块 + - 规模控制:通常 3-8 个顶层模块,每个模块 5-30 个组件 + - 组件 ID 必须原样保留(含 `::` 前缀) +4. **保存模块树**:调用 `save_module_tree`: + +```json +{ + "session_id": "", + "module_tree": { + "模块名": { + "components": ["file.py::ClassA", "file.py::func_b"], + "children": {} + } + } +} +``` + +返回结果中包含 `processing_order`——叶优先的文档生成顺序。 + +### 阶段 3:逐模块生成文档 + +按 `processing_order` 的顺序处理各模块。**先处理叶模块**,再处理父模块。 + +**每个叶模块**(is_leaf=true): + +1. 获取系统提示词:`get_prompt` → `{"prompt_type": "system_leaf", "variables": {"module_name": "<模块名>"}}` +2. 读取源码:`read_code_components` → 该模块所有组件 ID +3. 如需更多上下文,用 `view_repo_file` 补充读取 +4. 撰写文档,包含:模块简介与核心功能、架构图(至少 1 个 Mermaid 图表)、各组件职责说明、交叉引用 `[模块名](模块名.md)` +5. 保存:`write_doc_file` → `{"session_id": "...", "filename": "<模块名>.md", "content": "..."}` + +如果 Mermaid 校验失败,修正语法后用 `edit_doc_file`(`command: "str_replace"`)修改。 + +**每个父模块**(is_leaf=false): + +1. 用 `view_repo_file` 读取所有子模块已生成的 .md 文件 +2. 获取总览提示词:`get_prompt` → `{"prompt_type": "overview_module", "variables": {"module_name": "<模块名>"}}` +3. 综合子模块文档,生成父模块总览 +4. 用 `write_doc_file` 保存 + +### 阶段 4:生成仓库总览 + +1. 获取提示词:`get_prompt` → `{"prompt_type": "overview_repo", "variables": {"repo_name": "<仓库名>"}}` +2. 用 `view_repo_file` 读取所有已生成的模块文档 +3. 撰写仓库级总览,包含:项目简介、端到端架构图(Mermaid)、各模块文档的引用链接 +4. 保存:`write_doc_file` → `filename: "overview.md"` + +### 阶段 5:清理 + +调用 `close_session` → `{"session_id": ""}` 释放内存。 + +## 增量更新模式 + +当仓库已生成过文档(`output_dir` 下存在 `metadata.json` 和 `module_tree.json`),`analyze_repo` 的返回结果会包含 `changes` 字段: + +```json +{ + "changes": { + "has_previous": true, + "no_changes": false, + "method": "git", + "changed_files": ["auth.py", "utils.py::hash_password"], + "affected_modules": ["认证模块"], + "cascade_modules": ["核心系统", "overview"] + } +} +``` + +**变更检测策略**:优先使用 `git diff`(对比 commit SHA + 检查工作区未提交变更),非 git 仓库回退到对比文件修改时间。 + +**增量更新流程**: + +1. 调用 `analyze_repo`,检查 `changes` 字段 +2. 如果 `no_changes: true`,告知用户文档已是最新,无需操作 +3. 如果 `no_changes: false`,**只更新 `affected_modules` 中列出的模块**: + - 用 `read_code_components` 读取变更组件的新源码 + - 用 `edit_doc_file`(`str_replace`)局部修改对应文档,而非整篇重写 +4. 对 `cascade_modules` 中的父模块,读取已更新的子文档后同步刷新总览 +5. 最后更新 `overview.md` + +增量更新的粒度是**模块级**——一个模块内任一组件变更,该模块文档需要更新。相比全量生成,增量更新通常只需处理 1-3 个模块。 + +## 工具速查表 + +| 工具 | 用途 | +|------|------| +| `analyze_repo` | 分析仓库,构建依赖图,返回组件索引 | +| `read_code_components` | 根据组件 ID 读取源码(格式:`文件::名称`) | +| `view_repo_file` | 只读浏览仓库文件/目录 | +| `write_doc_file` | 创建 .md 文档(自动 Mermaid 校验) | +| `edit_doc_file` | 编辑文档:`str_replace` / `insert` / `undo` | +| `save_module_tree` | 保存模块聚类结果 | +| `get_processing_order` | 获取叶优先的处理顺序 | +| `get_prompt` | 获取提示词模板:`cluster`、`system_leaf`、`system_complex`、`user`、`overview_module`、`overview_repo` | +| `close_session` | 关闭会话释放资源(2 小时自动过期) | + +## 文档质量标准 + +- **语言**:默认中文撰写(除非用户指定其他语言) +- **Mermaid 图表**:每个模块至少 1 个架构图,优先使用 `graph TD` 或 `graph LR` +- **交叉引用**:引用其他模块时使用 `[模块名](模块名.md)` 格式 +- **代码示例**:关键函数/类展示签名和简要用法 +- **篇幅**:叶模块文档 200-500 行,父模块总览 100-300 行,仓库总览 80-200 行 + +## Mermaid 语法规范 + +```mermaid +graph TD + A[组件A] --> B[组件B] + A --> C[组件C] +``` + +- 节点 ID 仅使用字母和数字(避免中文、空格、冒号) +- 节点标签用方括号包裹:`A[显示文本]` +- 子图语法:`subgraph 标题 ... end` +- 禁止使用 `click`、`linkStyle` 等交互语法 + +## 错误处理 + +- **Mermaid 校验失败**:工具会返回校验错误信息,修正语法后用 `edit_doc_file` + `str_replace` 重试 +- **会话过期**(2 小时超时):重新调用 `analyze_repo` 创建新会话 +- **大型仓库(>10 万行)**:`analyze_repo` 可能需要约 30 秒,可通过 `include_patterns`/`exclude_patterns` 缩小分析范围 +- **组件 ID 格式**:始终使用 `component_index` 中的原始 ID(如 `src/main.py::MyClass`),保留 `::` 分隔符