Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 173 additions & 5 deletions openkb/agent/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"""
from __future__ import annotations

import asyncio
import os
import re
import sys
Expand All @@ -15,8 +16,10 @@
from typing import Any

from prompt_toolkit import PromptSession
from prompt_toolkit.completion import Completer, Completion, PathCompleter
from prompt_toolkit.document import Document
from prompt_toolkit.formatted_text import FormattedText
from prompt_toolkit.shortcuts import print_formatted_text
from prompt_toolkit.shortcuts import CompleteStyle, print_formatted_text
from prompt_toolkit.styles import Style

from openkb.agent.chat_session import ChatSession
Expand All @@ -39,13 +42,23 @@
"resume.turn": "#5fa0e0",
"resume.user": "bold",
"resume.assistant": "#8a8a8a",
# Completion menu — lightweight, no heavy background
"completion-menu": "bg:default #8a8a8a",
"completion-menu.completion": "bg:default #d0d0d0",
"completion-menu.completion.current": "bg:#3a3a3a #ffffff bold",
"completion-menu.meta.completion": "bg:default #6a6a6a",
"completion-menu.meta.completion.current": "bg:#3a3a3a #8a8a8a",
}

_HELP_TEXT = (
"Commands:\n"
" /exit Exit (Ctrl-D also works)\n"
" /clear Start a fresh session (current one is kept on disk)\n"
" /save [name] Export transcript to wiki/explorations/\n"
" /status Show knowledge base status\n"
" /list List all documents in the knowledge base\n"
" /lint Lint the knowledge base\n"
" /add <path> Add a document or directory to the knowledge base\n"
" /help Show this"
)

Expand Down Expand Up @@ -181,10 +194,101 @@ def _bottom_toolbar(session: ChatSession) -> FormattedText:
)


def _make_prompt_session(session: ChatSession, style: Style, use_color: bool) -> PromptSession:
_SLASH_COMMANDS: list[tuple[str, str]] = [
("/exit", "Exit (Ctrl-D also works)"),
("/quit", "Exit (alias)"),
("/help", "Show available commands"),
("/clear", "Start a fresh session"),
("/save", "Export transcript to wiki/explorations/"),
("/status", "Show knowledge base status"),
("/list", "List all documents"),
("/lint", "Lint the knowledge base"),
("/add", "Add a document or directory"),
]


class _ChatCompleter(Completer):
"""Complete slash commands and file paths after /add."""

def __init__(self) -> None:
self._path_completer = PathCompleter(expanduser=True)

def get_completions(self, document: Document, complete_event: Any) -> Any:
text = document.text_before_cursor

# After "/add ", complete file paths (skip dotfiles)
if text.lstrip().lower().startswith("/add "):
path_text = text.lstrip()[5:]
# Strip leading quote so PathCompleter resolves the real path
quote_char = ""
if path_text and path_text[0] in ("'", '"'):
quote_char = path_text[0]
path_text = path_text[1:]
path_doc = Document(path_text, len(path_text))
for c in self._path_completer.get_completions(path_doc, complete_event):
# Hide dotfiles unless the user explicitly typed a dot
basename = c.text.lstrip("/")
if basename.startswith(".") and not path_text.rpartition("/")[2].startswith("."):
continue
# Append closing quote for files; skip for directories so
# the user can keep navigating into subdirectories.
if quote_char and not c.text.endswith("/"):
comp_text = c.text + quote_char
else:
comp_text = c.text
yield Completion(
comp_text,
start_position=c.start_position,
display=c.display,
display_meta=c.display_meta,
)
return

# Complete slash commands with descriptions
if text.startswith("/"):
for cmd, desc in _SLASH_COMMANDS:
if cmd.startswith(text.lower()):
yield Completion(cmd, start_position=-len(text), display_meta=desc)


def _make_prompt_session(session: ChatSession, style: Style, use_color: bool, kb_dir: Path) -> PromptSession:
from prompt_toolkit.filters import has_completions
from prompt_toolkit.history import FileHistory
from prompt_toolkit.key_binding import KeyBindings

kb = KeyBindings()

@kb.add("tab", filter=has_completions)
def _accept_completion(event: Any) -> None:
"""Tab accepts the current completion (like zsh), not cycle."""
buf = event.current_buffer
state = buf.complete_state
if not state:
return
# Only one candidate or already selected — accept immediately
if state.current_completion:
buf.apply_completion(state.current_completion)
elif len(state.completions) == 1:
buf.apply_completion(state.completions[0])
else:
# Multiple candidates, nothing selected — highlight first
buf.go_to_completion(0)

@kb.add("tab", filter=~has_completions)
def _trigger_completion(event: Any) -> None:
"""Tab triggers completion when menu is not open."""
buf = event.current_buffer
buf.start_completion()

history_path = kb_dir / ".openkb" / "chat_history"
return PromptSession(
message=FormattedText([("class:prompt", ">>> ")]),
style=style,
completer=_ChatCompleter(),
complete_style=CompleteStyle.MULTI_COLUMN,
complete_while_typing=False,
key_bindings=kb,
history=FileHistory(str(history_path)),
bottom_toolbar=(lambda: _bottom_toolbar(session)) if use_color else None,
)

Expand Down Expand Up @@ -271,6 +375,39 @@ def _save_transcript(kb_dir: Path, session: ChatSession, name: str | None) -> Pa
return path


async def _run_add(arg: str, kb_dir: Path, style: Style) -> None:
"""Add a document or directory to the knowledge base from the chat REPL."""
from openkb.cli import add_single_file, SUPPORTED_EXTENSIONS

target = Path(arg).expanduser()
if not target.is_absolute():
target = Path.cwd() / target
target = target.resolve()

if not target.exists():
_fmt(style, ("class:error", f"Path does not exist: {arg}\n"))
return

if target.is_dir():
files = [
f for f in sorted(target.rglob("*"))
if f.is_file() and f.suffix.lower() in SUPPORTED_EXTENSIONS
]
if not files:
_fmt(style, ("class:error", f"No supported files found in {arg}.\n"))
return
total = len(files)
_fmt(style, ("class:slash.help", f"Found {total} supported file(s) in {arg}.\n"))
for i, f in enumerate(files, 1):
_fmt(style, ("class:slash.help", f"\n[{i}/{total}] "))
await asyncio.to_thread(add_single_file, f, kb_dir)
else:
if target.suffix.lower() not in SUPPORTED_EXTENSIONS:
_fmt(style, ("class:error", f"Unsupported file type: {target.suffix}\n"))
return
await asyncio.to_thread(add_single_file, target, kb_dir)


async def _handle_slash(
cmd: str,
kb_dir: Path,
Expand All @@ -282,6 +419,11 @@ async def _handle_slash(
parts = cmd.split(maxsplit=1)
head = parts[0].lower()
arg = parts[1].strip() if len(parts) > 1 else ""
# Strip surrounding quotes (user may type /add '/path/to file')
if len(arg) >= 2 and arg[0] == arg[-1] and arg[0] in ("'", '"'):
arg = arg[1:-1]
elif arg and arg[0] in ("'", '"'):
arg = arg[1:]

if head in ("/exit", "/quit"):
_fmt(style, ("class:header", "Bye. Thanks for using OpenKB.\n\n"))
Expand All @@ -307,6 +449,28 @@ async def _handle_slash(
_fmt(style, ("class:slash.ok", f"Saved to {path}\n"))
return None

if head == "/status":
from openkb.cli import print_status
print_status(kb_dir)
return None

if head == "/list":
from openkb.cli import print_list
print_list(kb_dir)
return None

if head == "/lint":
from openkb.cli import run_lint
await run_lint(kb_dir)
return None

if head == "/add":
if not arg:
_fmt(style, ("class:error", "Usage: /add <path>\n"))
return None
await _run_add(arg, kb_dir, style)
return None

_fmt(
style,
("class:error", f"Unknown command: {head}. Try /help.\n"),
Expand Down Expand Up @@ -335,7 +499,7 @@ async def run_chat(
if session.turn_count > 0:
_print_resume_view(session, style)

prompt_session = _make_prompt_session(session, style, use_color)
prompt_session = _make_prompt_session(session, style, use_color, kb_dir)

last_sigint = 0.0

Expand All @@ -360,13 +524,17 @@ async def run_chat(
continue

if user_input.startswith("/"):
action = await _handle_slash(user_input, kb_dir, session, style)
try:
action = await _handle_slash(user_input, kb_dir, session, style)
except KeyboardInterrupt:
_fmt(style, ("class:error", "\n[aborted]\n"))
continue
if action == "exit":
return
if action == "new_session":
session = ChatSession.new(kb_dir, session.model, session.language)
agent = build_query_agent(wiki_root, session.model, language=language)
prompt_session = _make_prompt_session(session, style, use_color)
prompt_session = _make_prompt_session(session, style, use_color, kb_dir)
continue

append_log(kb_dir / "wiki", "query", user_input)
Expand Down
64 changes: 40 additions & 24 deletions openkb/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def _find_kb_dir(override: Path | None = None) -> Path | None:
return None


def _add_single_file(file_path: Path, kb_dir: Path) -> None:
def add_single_file(file_path: Path, kb_dir: Path) -> None:
"""Convert, index, and compile a single document into the knowledge base.

Steps:
Expand Down Expand Up @@ -346,15 +346,15 @@ def add(ctx, path):
click.echo(f"Found {total} supported file(s) in {path}.")
for i, f in enumerate(files, 1):
click.echo(f"\n[{i}/{total}] ", nl=False)
_add_single_file(f, kb_dir)
add_single_file(f, kb_dir)
else:
if target.suffix.lower() not in SUPPORTED_EXTENSIONS:
click.echo(
f"Unsupported file type: {target.suffix}. "
f"Supported: {', '.join(sorted(SUPPORTED_EXTENSIONS))}"
)
return
_add_single_file(target, kb_dir)
add_single_file(target, kb_dir)


@cli.command()
Expand Down Expand Up @@ -519,24 +519,19 @@ def on_new_files(paths):
f"Supported: {', '.join(sorted(SUPPORTED_EXTENSIONS))}"
)
continue
_add_single_file(fp, kb_dir)
add_single_file(fp, kb_dir)

click.echo(f"Watching {raw_dir} for new documents. Press Ctrl+C to stop.")
watch_directory(raw_dir, on_new_files)


@cli.command()
@click.option("--fix", is_flag=True, default=False, help="Automatically fix lint issues (not yet implemented).")
@click.pass_context
def lint(ctx, fix):
"""Lint the knowledge base for structural and semantic inconsistencies."""
if fix:
click.echo("Warning: --fix is not yet implemented. Running lint in report-only mode.")
kb_dir = _find_kb_dir(ctx.obj.get("kb_dir_override"))
if kb_dir is None:
click.echo("No knowledge base found. Run `openkb init` first.")
return
async def run_lint(kb_dir: Path) -> Path | None:
"""Run structural + knowledge lint, write report, return report path.

Returns ``None`` if the KB has no indexed documents (nothing to lint).
Async because knowledge lint uses an LLM agent. Usable from CLI
(via ``asyncio.run``) and directly from the chat REPL.
"""
from openkb.lint import run_structural_lint
from openkb.agent.linter import run_knowledge_lint

Expand All @@ -556,15 +551,13 @@ def lint(ctx, fix):
_setup_llm_key(kb_dir)
model: str = config.get("model", DEFAULT_CONFIG["model"])

# Structural lint
click.echo("Running structural lint...")
structural_report = run_structural_lint(kb_dir)
click.echo(structural_report)

# Knowledge lint (semantic)
click.echo("Running knowledge lint...")
try:
knowledge_report = asyncio.run(run_knowledge_lint(kb_dir, model))
knowledge_report = await run_knowledge_lint(kb_dir, model)
except Exception as exc:
knowledge_report = f"Knowledge lint failed: {exc}"
click.echo(knowledge_report)
Expand All @@ -579,17 +572,25 @@ def lint(ctx, fix):
report_path.write_text(report_content, encoding="utf-8")
append_log(kb_dir / "wiki", "lint", f"report → {report_path.name}")
click.echo(f"\nReport written to {report_path}")
return report_path


@cli.command(name="list")
@cli.command()
@click.option("--fix", is_flag=True, default=False, help="Automatically fix lint issues (not yet implemented).")
@click.pass_context
def list_cmd(ctx):
"""List all documents in the knowledge base."""
def lint(ctx, fix):
"""Lint the knowledge base for structural and semantic inconsistencies."""
if fix:
click.echo("Warning: --fix is not yet implemented. Running lint in report-only mode.")
kb_dir = _find_kb_dir(ctx.obj.get("kb_dir_override"))
if kb_dir is None:
click.echo("No knowledge base found. Run `openkb init` first.")
return
asyncio.run(run_lint(kb_dir))


def print_list(kb_dir: Path) -> None:
"""Print all documents in the knowledge base. Usable from CLI and chat REPL."""
openkb_dir = kb_dir / ".openkb"
hashes_file = openkb_dir / "hashes.json"
if not hashes_file.exists():
Expand Down Expand Up @@ -642,15 +643,19 @@ def list_cmd(ctx):
click.echo(f" - {r}")


@cli.command()
@cli.command(name="list")
@click.pass_context
def status(ctx):
"""Show the current status of the knowledge base."""
def list_cmd(ctx):
"""List all documents in the knowledge base."""
kb_dir = _find_kb_dir(ctx.obj.get("kb_dir_override"))
if kb_dir is None:
click.echo("No knowledge base found. Run `openkb init` first.")
return
print_list(kb_dir)


def print_status(kb_dir: Path) -> None:
"""Print knowledge base status. Usable from CLI and chat REPL."""
wiki_dir = kb_dir / "wiki"
subdirs = ["sources", "summaries", "concepts", "reports"]

Expand Down Expand Up @@ -698,3 +703,14 @@ def status(ctx):
import datetime
mtime = datetime.datetime.fromtimestamp(newest_report.stat().st_mtime)
click.echo(f" Last lint: {mtime.strftime('%Y-%m-%d %H:%M:%S')}")


@cli.command()
@click.pass_context
def status(ctx):
"""Show the current status of the knowledge base."""
kb_dir = _find_kb_dir(ctx.obj.get("kb_dir_override"))
if kb_dir is None:
click.echo("No knowledge base found. Run `openkb init` first.")
return
print_status(kb_dir)
Loading