diff --git a/.gitignore b/.gitignore index cb27d9a5..a0f47b6e 100644 --- a/.gitignore +++ b/.gitignore @@ -94,6 +94,11 @@ memory/L4_raw_sessions/* # Memory management !memory/memory_cleanup_sop.md +# Codex session distillation tool (state remains ignored under memory/codex_distill/) +!memory/codex_session_distill.py +!memory/codex_session_distill_sop.md +!memory/codex_coding_sop.md + # Visual Studio .vs/ restore_commit.txt diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 00000000..b193914b --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,32 @@ +# Repository Guidelines + +## 项目结构与模块组织 + +GenericAgent 是一个紧凑的 Python 项目。核心运行时代码位于仓库根目录,包括 `agentmain.py`、`agent_loop.py`、`ga.py`、`llmcore.py` 和 `simphtml.py`。可安装的 CLI 包在 `ga_cli/`,`ga` 命令入口映射到 `ga_cli.cli:main`。各类界面和聊天/机器人适配器位于 `frontends/`;图片、皮肤和静态资源位于 `frontends/skins/` 与 `assets/`。长期记忆、SOP 和辅助工具位于 `memory/`,反射与自主运行辅助逻辑位于 `reflect/`,可选集成放在 `plugins/`。测试统一放在 `tests/`。 + +## 构建、测试与本地运行 + +- `python -m pip install -e .`:以 editable 模式安装核心包和 `ga` 命令。 +- `python -m pip install -e ".[ui]"`:安装核心依赖和桌面/TUI UI 依赖。 +- `python launch.pyw`:启动默认桌面界面。 +- `python frontends/tuiapp.py`:启动终端 UI。 +- `streamlit run frontends/stapp2.py`:启动 Streamlit 前端。 +- `python -m unittest discover -s tests`:运行当前测试套件。 + +只安装正在修改的前端或机器人适配器所需的可选依赖。 + +## 编码风格与命名规范 + +使用 Python 3.10-3.13。代码应保持紧凑、可读,并贴合现有文件风格。优先使用自解释的函数和变量,少写解释性注释。避免过宽的 `try/except`,重要错误应清晰暴露。模块、函数和变量使用 `snake_case`,类名使用 `PascalCase`。新增模块应靠近功能边界,例如 UI 适配放在 `frontends/`,可选集成放在 `plugins/`。 + +## 测试指南 + +测试使用标准库 `unittest`。测试文件命名为 `test_*.py`,放在 `tests/`。新增前端或适配器行为时,应 stub 外部服务,避免依赖真实 API 凭据。提交前运行 `python -m unittest discover -s tests`,修复 bug 时补充聚焦的回归测试。 + +## 提交与 PR 规范 + +近期历史使用 Conventional Commits,例如 `feat(tui): ...`、`fix(tgapp): ...`、`docs: ...` 和 `refactor: ...`。提交应小而聚焦。PR 应说明背景、概述行为变化、列出验证命令;只有可见 UI 变化才附截图。避免不必要的新依赖和大范围重构。 + +## 安全与配置提示 + +不要提交真实 API key 或本地密钥。配置示例应维护在 `mykey_template.py`、`mykey_template_en.py` 或 `assets/configure_mykey.py`。本地生成状态、日志和凭据应保持在版本控制之外。 diff --git a/agentmain.py b/agentmain.py index 6cd7aab8..58543eaa 100644 --- a/agentmain.py +++ b/agentmain.py @@ -1,4 +1,4 @@ -import os, sys, threading, queue, time, json, re, random, locale +import os, sys, threading, queue, time, json, re, random, locale, base64, mimetypes os.environ.setdefault('GA_LANG', 'zh' if any(k in (locale.getlocale()[0] or '').lower() for k in ('zh', 'chinese')) else 'en') if sys.stdout is None: sys.stdout = open(os.devnull, "w") elif hasattr(sys.stdout, 'reconfigure'): sys.stdout.reconfigure(errors='replace') @@ -11,6 +11,50 @@ from ga import GenericAgentHandler, smart_format, get_global_memory, format_error, consume_file script_dir = os.path.dirname(os.path.abspath(__file__)) +_IMAGE_EXTS = {'.png', '.jpg', '.jpeg', '.webp', '.gif', '.bmp'} + + +def _extract_image_paths(text): + paths = [] + for raw in re.findall(r'"([^"]+\.(?:png|jpe?g|webp|gif|bmp))"|\'([^\']+\.(?:png|jpe?g|webp|gif|bmp))\'|(\S+\.(?:png|jpe?g|webp|gif|bmp))', text or '', re.I): + p = next((x for x in raw if x), '') + if not p: continue + path = p if os.path.isabs(p) else os.path.join(script_dir, p) + if os.path.isfile(path) and path not in paths: + paths.append(path) + return paths + + +def _image_block(path): + media_type = mimetypes.guess_type(path)[0] or 'image/png' + with open(path, 'rb') as f: + data = base64.b64encode(f.read()).decode('ascii') + return {"type": "image", "source": {"type": "base64", "media_type": media_type, "data": data}} + + +def _build_user_content_with_images(text, images=None): + image_paths = [] + for p in list(images or []) + _extract_image_paths(text): + path = p if os.path.isabs(str(p)) else os.path.join(script_dir, str(p)) + if os.path.isfile(path) and os.path.splitext(path)[1].lower() in _IMAGE_EXTS and path not in image_paths: + image_paths.append(path) + if not image_paths: + return None + content = [{"type": "text", "text": text or ""}] + for path in image_paths: + try: + content.append(_image_block(path)) + except Exception as e: + content[0]["text"] += f"\n[图片附件读取失败: {path}: {e}]" + return content + + +def _native_image_input_enabled(llmclient): + backend = getattr(llmclient, 'backend', None) + backend = getattr(backend, 'primary', backend) + return isinstance(backend, NativeOAISession) and bool(getattr(backend, 'native_image_input', False)) + + def load_tool_schema(suffix=''): global TOOLS_SCHEMA TS = open(os.path.join(script_dir, f'assets/tools_schema{suffix}.json'), 'r', encoding='utf-8').read() @@ -64,14 +108,19 @@ def load_llm_sessions(self): if 'mixin' in k: llm_sessions += [{'mixin_cfg': cfg}] elif c := resolve_client(k): llm_sessions += [c] except: pass + resolved_sessions = [] for i, s in enumerate(llm_sessions): if isinstance(s, dict) and 'mixin_cfg' in s: try: mixin = MixinSession(llm_sessions, s['mixin_cfg']) - if isinstance(mixin._sessions[0], (NativeClaudeSession, NativeOAISession)): llm_sessions[i] = NativeToolClient(mixin) - else: llm_sessions[i] = ToolClient(mixin) + if isinstance(mixin._sessions[0], (NativeClaudeSession, NativeOAISession)): resolved_sessions.append(NativeToolClient(mixin)) + else: resolved_sessions.append(ToolClient(mixin)) except Exception as e: print(f'\n\n\n[ERROR] Failed to init MixinSession with cfg {s["mixin_cfg"]}: {e}!!!\n\n') - self.llmclients = llm_sessions + else: + resolved_sessions.append(s) + if not resolved_sessions: raise Exception('[ERROR] No available LLM sessions: Check your mykey.py') + self.llmclients = resolved_sessions + self.llm_no %= len(self.llmclients) self.llmclient = self.llmclients[self.llm_no%len(self.llmclients)] if oldhistory: self.llmclient.backend.history = oldhistory @@ -125,7 +174,7 @@ def _handle_slash_cmd(self, raw_query, display_queue): def run(self): while True: task = self.task_queue.get() - raw_query, source, display_queue = task["query"], task["source"], task["output"] + raw_query, source, images, display_queue = task["query"], task["source"], task.get("images") or [], task["output"] raw_query = self._handle_slash_cmd(raw_query, display_queue) if raw_query is None: self.task_queue.task_done(); continue @@ -143,8 +192,10 @@ def run(self): if ps > 0: handler.working['key_info'] += f'\n[SYSTEM] 此为 {ps} 个对话前设置的key_info,若已在新任务,先更新或清除工作记忆。\n' self.handler = handler # although new handler, the **full** history is in llmclient, so it is full history! self.llmclient.log_path = self.log_path - gen = agent_runner_loop(self.llmclient, sys_prompt, raw_query, - handler, TOOLS_SCHEMA, max_turns=70, verbose=self.verbose) + initial_content = _build_user_content_with_images(raw_query, images) if _native_image_input_enabled(self.llmclient) else None + gen = agent_runner_loop(self.llmclient, sys_prompt, raw_query, + handler, TOOLS_SCHEMA, max_turns=70, verbose=self.verbose, + initial_user_content=initial_content) try: full_resp = ""; last_pos = 0 for chunk in gen: diff --git a/assets/global_mem_insight_template.txt b/assets/global_mem_insight_template.txt index 905ea7fa..898a9062 100644 --- a/assets/global_mem_insight_template.txt +++ b/assets/global_mem_insight_template.txt @@ -2,7 +2,7 @@ 需要时read L2 或 ls ../memory/ 查L3 L0(META-SOP): memory_management_sop L2: 现空 -L3: memory_cleanup_sop(记忆整理) | skill_search | ui_detect.py | ocr_utils.py | subagent | web_setup_sop | plan_sop +L3: memory_cleanup_sop(记忆整理) | skill_search | ui_detect.py | ocr_utils.py | subagent | web_setup_sop | plan_sop | codex_session_distill_sop | procmem_scanner | keychain | ljqCtrl_sop+.py | tmwebdriver_sop | autonomous_operation_sop | scheduled_task_sop | vision_sop | adb_ui.py L4: L4_raw_sessions/ 历史会话 @@ -10,6 +10,7 @@ L4: L4_raw_sessions/ 历史会话 键鼠: ljqCtrl_sop(禁pyautogui/先activate) 截图/视觉: ocr/vision_sop | 禁全屏截图,优先窗口 定时:scheduled_task_sop | 自主:autonomous_operation_sop | watchdog/反射:agentmain --reflect 手机:adb_ui.py +编码经验:codex_coding_sop [RULES] 1. 搜索先行: 搜文件名严禁不用es(禁PS递归/禁dir遍历), 搜索一定优先使用web工具的google(严禁duckduckgo等), 优先看cwd,禁猜路径 diff --git a/assets/tools_schema.json b/assets/tools_schema.json index 0ba8b0ef..4f65f14d 100644 --- a/assets/tools_schema.json +++ b/assets/tools_schema.json @@ -66,9 +66,21 @@ "question": {"type": "string", "description": "Question for the user"}, "candidates": {"type": "array", "items": {"type": "string"}, "description": "Optional quick-select choices for the user"}}} }}, + {"type": "function", "function": { + "name": "codex_lesson_update", + "description": "Record one LLM-proposed reusable coding lesson after reading a redacted Codex session packet. LLM analysis is required: do not use this for simple rule matches, raw logs, secrets, private paths, or project-specific facts. The tool validates, deduplicates, and stores candidates; independent source_hash values become promotion evidence.", + "parameters": {"type": "object", "properties": { + "title": {"type": "string", "description": "Short reusable lesson title"}, + "guidance": {"type": "string", "description": "Concrete reusable guidance, not project-specific"}, + "category": {"type": "string", "description": "workflow/debugging/testing/git/security/frontend/planning/communication/quality", "default": "workflow"}, + "evidence": {"type": "array", "items": {"type": "string"}, "description": "Short evidence signals from the redacted packet timeline, verification, failure recovery, or seed observations"}, + "source_hash": {"type": "string", "description": "Packet/session hash prefix from the packet; used as independent promotion evidence"}, + "state_dir": {"type": "string", "description": "Optional isolated distill state directory for tests or explicit runs; omit for the default memory/codex_distill state"}, + "confidence": {"type": "number", "description": "0.0-1.0 confidence", "default": 0.5}}} + }}, {"type": "function", "function": { "name": "start_long_term_update", "description": "Start distilling long-term memory. Call when discovering info worth remembering (env facts/user prefs/lessons learned). Skip if memory already updated or in autonomous flow. Must call when a task that took 15+ turns is completed", "parameters": {"type": "object", "properties": {}}} } -] \ No newline at end of file +] diff --git a/assets/tools_schema_cn.json b/assets/tools_schema_cn.json index 17f14c3a..fe6d1c41 100644 --- a/assets/tools_schema_cn.json +++ b/assets/tools_schema_cn.json @@ -66,9 +66,21 @@ "question": {"type": "string", "description": "向用户提出的明确问题"}, "candidates": {"type": "array", "items": {"type": "string"}, "description": "提供给用户的可选快捷选项列表"}}} }}, + {"type": "function", "function": { + "name": "codex_lesson_update", + "description": "在阅读脱敏 Codex session packet 后,记录一条由 LLM 分析提出的可复用编码经验。必须由 LLM 基于完整 packet 判断,不要用于简单规则匹配、原始日志、密钥、私有路径或项目特定事实。工具会校验、去重并写入候选库;独立 source_hash 会作为晋升证据。", + "parameters": {"type": "object", "properties": { + "title": {"type": "string", "description": "简短的可复用经验标题"}, + "guidance": {"type": "string", "description": "具体可复用做法,不能是项目特定事实"}, + "category": {"type": "string", "description": "workflow/debugging/testing/git/security/frontend/planning/communication/quality", "default": "workflow"}, + "evidence": {"type": "array", "items": {"type": "string"}, "description": "来自脱敏 packet 的 timeline、验证命令、失败恢复或 seed observation 的短证据信号"}, + "source_hash": {"type": "string", "description": "packet 中的 session hash 前缀;用于独立晋升证据计数"}, + "state_dir": {"type": "string", "description": "可选的隔离蒸馏状态目录,用于测试或显式指定运行;不填则使用默认 memory/codex_distill"}, + "confidence": {"type": "number", "description": "0.0-1.0置信度", "default": 0.5}}} + }}, {"type": "function", "function": { "name": "start_long_term_update", "description": "准备开始提炼记忆。发现值得长期记忆的信息(环境事实/用户偏好/避坑经验)时调用此工具。已记忆更新或在自主流程内时无需调用。超15轮完成的任务必须调用以沉淀经验", "parameters": {"type": "object", "properties": {}}} } -] \ No newline at end of file +] diff --git "a/dev_ideas/20260513_\345\212\240\345\205\245codex\350\222\270\351\246\217\346\212\200\350\203\275.md" "b/dev_ideas/20260513_\345\212\240\345\205\245codex\350\222\270\351\246\217\346\212\200\350\203\275.md" new file mode 100644 index 00000000..28542ca8 --- /dev/null +++ "b/dev_ideas/20260513_\345\212\240\345\205\245codex\350\222\270\351\246\217\346\212\200\350\203\275.md" @@ -0,0 +1,654 @@ +# 20260513 加入 Codex 蒸馏技能 + +## 背景与动机 + +GenericAgent 的自我进化设计强调“进化的是策略,不是工具”。工具层保持稳定,经验、SOP、脚本沉淀在 `memory/` 中,后续任务通过 L1 索引发现并复用这些策略。 + +但这套机制存在冷启动问题:初始 `memory/` 里缺少足够多的复杂编码任务经验时,GenericAgent 每次遇到代码任务仍然需要大量探索。用户提出一个直接思路:本机 Codex 已经积累了大量高级编码 agent 的 JSONL 会话历史,这些会话包含真实的任务拆解、工具调用、失败恢复、测试验证和最终交付过程,完全可以作为 GenericAgent 的外部经验源。 + +这次 feature 的目标不是“训练模型权重”,也不是把 Codex 原始对话塞进提示词,而是把 Codex 历史当作 L4 raw experience: + +```text +Codex JSONL 会话 +→ 脱敏、压缩、结构化 packet +→ 规则蒸馏出高置信 lesson +→ GA/LLM 阅读 packet 发现更灵活的候选 lesson +→ 专用工具校验、去重、晋升 +→ 渲染成 memory/codex_coding_sop.md +→ L1 只保留短索引 +``` + +这个思路和项目原有 `start_long_term_update` 的设计一致:LLM 负责判断和压缩经验,程序和工具负责边界、执行、反馈、写入。 + +## 设计取舍 + +### 为什么不能直接导入 JSONL + +Codex JSONL 里可能包含: + +- 用户私有路径,例如 `C:\Users\...` +- 命令输出、日志、临时错误 +- 项目特定事实 +- API key、token、cookie、`.env` 等敏感内容 +- 失败尝试和一次性上下文 + +如果直接导入 `memory/`,会污染长期记忆,也违反 `memory_management_sop.md` 中“行动验证、长期有效、最小充分指针”的原则。 + +### 为什么第一版不是纯 LLM + +LLM 不能被当成可信的格式执行器。它可能输出非法 JSON、漏字段、夹带路径或把一次性细节误判为通用经验。所以第一阶段先做确定性程序: + +- 解析 JSONL +- 脱敏 +- 识别工具调用信号 +- 维护进度账本 +- 渲染 SOP + +第一版只内置 4 条高置信规则,证明闭环可靠。 + +### 为什么后来加入 `codex_lesson_update` + +用户指出:GenericAgent 原本的进化是有 LLM 参与的,纯规则识别不够灵活。重新阅读代码后确认,项目现有自我进化不是“LLM 输出某个最终 JSON,程序解析”,而是: + +```text +LLM 小步调用工具 +→ 程序执行工具 +→ 程序返回成功/失败 +→ LLM 根据反馈继续修正 +``` + +对应代码: + +- `agent_loop.py` 负责解析工具调用并 dispatch。 +- `ga.py::do_start_long_term_update` 触发长期记忆结算。 +- `file_read/file_patch/file_write/code_run` 作为物理工具执行写入。 +- `llmcore.py` 对原生工具调用和文本 `` 做解析与容错。 + +因此最终采用更贴合项目思想的设计: + +```text +LLM 不直接 patch 正式 SOP +LLM 读取脱敏 packet +LLM 调用 codex_lesson_update 提议一条候选经验 +codex_lesson_update 做校验、脱敏、去重、落盘 +promote 再将候选晋升为正式 lesson 并渲染 SOP +``` + +## 文件与职责 + +### `memory/codex_session_distill.py` + +这是核心实现,零新增依赖,只使用标准库。 + +主要常量: + +```python +DEFAULT_STATE_DIR = Path(__file__).resolve().parent / "codex_distill" +DEFAULT_SOP_PATH = Path(__file__).resolve().parent / "codex_coding_sop.md" +DEFAULT_CODEX_ROOT = Path.home() / ".codex" / "sessions" +DEFAULT_ROOT_SENTINEL = "__AUTO_CODEX_SESSIONS__" +VALID_CATEGORIES = { + "workflow", "debugging", "testing", "git", "security", + "frontend", "planning", "communication", "quality" +} +``` + +`memory/codex_distill/` 是本机状态目录,仍被 `.gitignore` 忽略,不提交: + +- `progress.json`:session hash 进度账本 +- `lessons.jsonl`:正式 lesson 数据 +- `candidate_lessons.jsonl`:LLM 提议但需晋升的候选经验 +- `queue/`:待学习的脱敏 packet +- `learned_packets/`:已经学过的 packet + +### 路径自动发现 + +函数:`discover_codex_session_roots` + +目标是避免把 `C:\Users\Administrator\.codex\sessions` 写死。查找优先级: + +1. `$CODEX_SESSIONS_DIR` +2. `$CODEX_HOME\sessions` +3. `Path.home() / ".codex" / "sessions"` +4. `%APPDATA%\Codex\sessions` +5. `%APPDATA%\.codex\sessions` +6. `%LOCALAPPDATA%\Codex\sessions` +7. `%LOCALAPPDATA%\.codex\sessions` +8. 当前工作区向上几层的 `.codex\sessions` + +实现上只返回存在的目录,并做大小写不敏感去重。 + +CLI 的 `scan/prepare/run` 默认参数是 `DEFAULT_ROOT_SENTINEL`,没有传路径时会调用 `_resolve_roots()` 自动发现。 + +验证结果: + +```text +python .\memory\codex_session_distill.py scan +roots=C:\Users\Administrator\.codex\sessions found=21 +``` + +### JSONL 解析与 packet + +核心类:`SessionPacket` + +字段包括: + +- `session_hash` +- `source` +- `size` +- `mtime` +- `cwd` +- `user_goals` +- `tool_counts` +- `signals` +- `lessons` +- `quality` +- `focus` + +核心函数:`extract_session_packet(path, focus="workflow")` + +它会流式读取 JSONL,只关心 Codex 结构中的这些事件: + +- `session_meta` +- `event_msg` +- `response_item` +- `function_call` +- `function_call_output` +- `custom_tool_call` +- `custom_tool_call_output` + +它不会保存原始对话,而是提取压缩信号: + +- 是否有 `rg` +- 是否读仓库信息 +- 是否有 patch +- 是否运行测试/lint/type check +- 是否失败后恢复成功 + +脱敏函数:`redact_text` + +处理: + +- `sk-...` +- GitHub token +- Bearer token +- `api_key/token/authorization/cookie/secret/password` +- Windows 用户目录 +- macOS/Linux 用户目录 +- 长文本截断 + +### 规则层 lesson + +第一版内置 4 条确定性规则: + +1. `repo_probe_before_edit` + - 触发:先有仓库探测/快速搜索,再有 patch。 + - 做法:改代码前先看项目规范、状态、相关代码,做最小补丁。 + +2. `prefer_fast_text_search` + - 触发:命令中出现 `rg` / ripgrep。 + - 做法:优先快速文本搜索定位文件、符号、测试。 + +3. `verify_changes_before_done` + - 触发:有 patch 且运行测试/lint/type check。 + - 做法:完成前必须验证。 + +4. `failure_recovery_with_new_information` + - 触发:先看到失败输出,后看到成功输出。 + - 做法:失败后读取错误、补充状态、再切换策略,不重复同一动作。 + +这些规则的价值是高置信、低噪声,但覆盖面有限。 + +### 进度账本 + +核心类:`DistillState` + +```python +class DistillState: + def __init__(self, root: Path | str = DEFAULT_STATE_DIR): + self.root = Path(root) + self.progress_path = self.root / "progress.json" + self.lessons_path = self.root / "lessons.jsonl" + self.candidates_path = self.root / "candidate_lessons.jsonl" + self.queue_dir = self.root / "queue" + self.learned_dir = self.root / "learned_packets" +``` + +`prepare_sessions()` 中用 `sha256` 做 session key: + +```text +sha256: +``` + +若某 session 已经是 `prepared` / `learned` / `skipped` 且 size 未变,则跳过,避免反复学习同一个 JSONL。 + +### 扫描不是随机采样 + +当前策略不是随机: + +- 传月份目录时,只扫描该目录。 +- 不传路径时,扫描自动发现到的 sessions 根目录。 +- 文件按路径稳定排序。 +- 取前 N 个未处理且达到质量门槛的 JSONL。 + +这点写入了 `memory/codex_session_distill_sop.md`,避免误解。 + +## LLM 参与的深度蒸馏 + +### 候选写入工具 + +核心函数:`codex_lesson_update` + +签名: + +```python +def codex_lesson_update( + state: DistillState | None = None, + *, + title: str, + guidance: str, + category: str = "workflow", + evidence: Iterable[str] | None = None, + source_hash: str = "", + confidence: float = 0.5, +) -> dict: +``` + +职责: + +- 接收 LLM 从脱敏 packet 中提议的一条经验。 +- 在写入前检查敏感内容。 +- 对 `title/guidance/category/evidence` 再做脱敏。 +- 规范化 category。 +- 生成稳定 lesson id。 +- 合并已有 candidate。 +- 增加 `evidence_count` 和 `confidence_max`。 +- 写入 `candidate_lessons.jsonl`。 + +敏感拒绝发生在脱敏前: + +```python +if _has_sensitive_text(title, guidance, category, " ".join(str(x) for x in (evidence or []))): + return {"status": "rejected", "reason": "sensitive_content_detected"} +``` + +这样不会出现“先脱敏导致无法判断原文是否敏感”的问题。 + +### lesson id + +函数:`_candidate_id(category, title)` + +一般用英文 slug: + +```text +workflow_ground_documentation_in_project_commit +``` + +对中文标题中出现“未提交/worktree”的情况做了特殊稳定映射: + +```text +git_protect_user_worktree +``` + +这是为了让“保护用户未提交改动”这类高价值经验有稳定 id。 + +### 晋升候选 + +函数:`promote_candidates` + +默认规则: + +```python +min_evidence=2 +min_confidence=0.85 +``` + +满足任一条件时可晋升: + +- `evidence_count >= min_evidence` +- `confidence_max >= min_confidence` + +晋升会写入 `lessons.jsonl`,然后 `render_sop()` 渲染 `codex_coding_sop.md`。 + +### CLI + +新增子命令: + +```powershell +python ../memory/codex_session_distill.py candidate ` + --title "..." ` + --guidance "..." ` + --category workflow ` + --evidence fast_search ` + --source-hash abc123 ` + --confidence 0.8 +``` + +```powershell +python ../memory/codex_session_distill.py promote +``` + +保留原有: + +```powershell +scan +prepare +learn +render +run +status +``` + +## GenericAgent 工具接入 + +### `ga.py` + +新增 handler 方法:`do_codex_lesson_update` + +位置:`ga.py:519` + +逻辑: + +```python +def do_codex_lesson_update(self, args, response): + from memory.codex_session_distill import DistillState, codex_lesson_update + state_dir = args.get("state_dir") or os.path.join(script_dir, "memory", "codex_distill") + evidence = args.get("evidence") or args.get("evidence_signals") or [] + if isinstance(evidence, str): + evidence = [evidence] + result = codex_lesson_update( + DistillState(state_dir), + title=args.get("title", ""), + guidance=args.get("guidance", ""), + category=args.get("category", "workflow"), + evidence=evidence, + source_hash=args.get("source_hash", ""), + confidence=args.get("confidence", 0.5), + ) +``` + +返回: + +- 成功:`candidate_recorded` +- 失败:`rejected` + +并把结果作为工具结果返回给 LLM,让 LLM 能继续修正或执行 promote。 + +### 工具 schema + +修改: + +- `assets/tools_schema.json` +- `assets/tools_schema_cn.json` + +新增工具: + +```json +{ + "name": "codex_lesson_update", + "description": "Record one LLM-proposed reusable coding lesson from a redacted Codex packet...", + "parameters": { + "type": "object", + "properties": { + "title": {"type": "string"}, + "guidance": {"type": "string"}, + "category": {"type": "string"}, + "evidence": {"type": "array", "items": {"type": "string"}}, + "source_hash": {"type": "string"}, + "confidence": {"type": "number"} + } + } +} +``` + +这让 GA 可以通过标准工具调用协议完成候选经验写入,而不是直接编辑正式 SOP。 + +## SOP 与索引 + +### `memory/codex_session_distill_sop.md` + +包含两种流程。 + +快速规则蒸馏: + +```powershell +python ../memory/codex_session_distill.py status +python ../memory/codex_session_distill.py run --limit 3 +``` + +LLM 深度蒸馏: + +```powershell +python ../memory/codex_session_distill.py prepare --limit 1 +``` + +然后 GA 读取 `codex_distill/queue/*.md`,基于脱敏 packet 调用: + +```text +codex_lesson_update +``` + +最后: + +```powershell +python ../memory/codex_session_distill.py promote +``` + +### `memory/codex_coding_sop.md` + +这是最终渲染出的经验 SOP。目前包含: + +- failure recovery +- verify before done +- fast text search +- probe before edit +- protect user worktree +- ground documentation in commit history + +### L1 索引 + +修改:`assets/global_mem_insight_template.txt` + +新增: + +```text +L3: ... | codex_session_distill_sop +编码经验:codex_coding_sop +``` + +## `.gitignore` + +仓库默认忽略 `memory/*`,所以需要显式放行可复用文件: + +```gitignore +!memory/codex_session_distill.py +!memory/codex_session_distill_sop.md +!memory/codex_coding_sop.md +``` + +但继续忽略: + +```text +memory/codex_distill/* +temp/* +``` + +这样不会提交本机会话处理进度和 E2E 输出。 + +## 测试 + +### `tests/test_codex_session_distill.py` + +覆盖: + +- 自动路径发现 +- JSONL 解析 +- 敏感内容脱敏 +- 规则 lesson 识别 +- progress 跳过已处理文件 +- 稳定路径顺序,不随机采样 +- lessons 合并与 SOP 渲染 +- `codex_lesson_update` 的校验、写候选、晋升 +- 敏感候选拒绝 + +### `tests/test_codex_lesson_update_tool.py` + +覆盖 GA handler 工具: + +- `GenericAgentHandler.do_codex_lesson_update` 能写入候选 +- 敏感 guidance 会被拒绝 + +由于 handler 方法是 generator,测试里使用 `_exhaust()` 消费 generator,模拟框架 dispatch 行为。 + +### 测试结果 + +最终验证: + +```text +python -m unittest discover -s tests +Ran 29 tests in 0.384s +OK +``` + +## E2E 过程 + +### 工具层 E2E + +执行: + +```powershell +python .\memory\codex_session_distill.py run --limit 1 +``` + +结果: + +```text +prepared=1 learned=1 rendered=...codex_coding_sop.md bytes=1446 +``` + +说明扫描、prepare、learn、render 闭环正常。 + +### GA 真实 task E2E:规则蒸馏 + +通过: + +```powershell +python .\agentmain.py --task codex_distill_e2e3 --llm_no 0 +``` + +GA 执行: + +1. 读 `codex_session_distill_sop.md` +2. 运行 `status` +3. 运行 `run --limit 1` +4. 再运行 `status` +5. 读 `codex_coding_sop.md` + +结果: + +```text +sessions=14 prepared=0 learned=8 skipped=6 lessons=4 +``` + +证明自动路径发现和规则蒸馏能由 GA 自己驱动。 + +### GA 真实 task E2E:LLM 深度蒸馏 + +通过: + +```powershell +python .\agentmain.py --task codex_lesson_update_e2e --llm_no 0 +``` + +GA 执行: + +1. `file_read` 读取 SOP +2. `code_run` 执行 `prepare --limit 1` +3. 读取最新 `queue/*.md` packet +4. LLM 从 packet 中发现新经验 +5. 调用 `codex_lesson_update` +6. 执行 `promote --min-evidence 1 --min-confidence 0.5` +7. 执行 `status` + +真实输出确认: + +```text +codex_lesson_update 成功,候选已记录 +promoted=1 +candidates=2 +lessons=6 +``` + +LLM 新发现并晋升的经验: + +```text +Ground documentation in project commit history +``` + +具体做法: + +```text +When generating contributor documentation (AGENTS.md/CONTRIBUTING.md), +first review recent commit history to understand actual team conventions, +workflow patterns, and practices, grounding documentation in real project +activity rather than assumptions. +``` + +这条经验不是 4 条规则之一,说明 LLM 参与层确实提升了灵活性。 + +## 本次 commit + +提交: + +```text +045688a feat: 增加 Codex 会话经验蒸馏 +``` + +提交内容: + +- `.gitignore` +- `assets/global_mem_insight_template.txt` +- `assets/tools_schema.json` +- `assets/tools_schema_cn.json` +- `ga.py` +- `memory/codex_coding_sop.md` +- `memory/codex_session_distill.py` +- `memory/codex_session_distill_sop.md` +- `tests/test_codex_lesson_update_tool.py` +- `tests/test_codex_session_distill.py` + +注意:本 dev idea 文档是在该 commit 之后补写,尚未包含在上述 commit 中。 + +## 当前状态与后续方向 + +当前机制已经实现: + +- 自动发现 Codex sessions +- 规则蒸馏 +- 进度账本 +- 脱敏 packet +- LLM 候选发现 +- 专用工具写入候选 +- 候选晋升 +- SOP 渲染 +- GA 真实 E2E + +后续可以继续增强: + +1. 更丰富的 packet 内容 + 当前 packet 主要保留用户目标、信号和规则 lesson。可以加入更结构化的“命令序列摘要”和“失败恢复路径”。 + +2. 更稳的候选相似度合并 + 目前主要靠 id 合并。未来可增加简单文本相似度,避免 LLM 用不同标题表达同一经验。 + +3. 更严格的晋升策略 + 现在 `promote` 支持 evidence/confidence 门槛。自主模式下应使用更保守门槛,E2E 才使用低门槛。 + +4. 周期性自主学习 + 可把 `codex_session_distill_sop` 加入 autonomous task planning,让 GA 空闲时小批量学习。 + +5. 失败案例学习 + 目前更偏成功经验。可以单独维护 `negative_lessons`,记录“不要这么做”的失败模式。 + +6. 与 `memory_management_sop` 更紧密集成 + 目前 Codex coding SOP 是独立 L3。未来可让 GA 在 `start_long_term_update` 中主动查询 Codex coding SOP,作为编码任务的先验。 diff --git a/frontends/tui_input_history.py b/frontends/tui_input_history.py new file mode 100644 index 00000000..452c3d6d --- /dev/null +++ b/frontends/tui_input_history.py @@ -0,0 +1,49 @@ +class InputHistoryMixin: + """Small history navigator for Textual TextArea subclasses.""" + + def _init_input_history(self) -> None: + self._history: list[str] = [] + self._history_index: int | None = None + self._history_draft = "" + + def add_history(self, value: str) -> None: + value = (value or "").rstrip() + if not value: + self._history_index = None + self._history_draft = "" + return + if not self._history or self._history[-1] != value: + self._history.append(value) + self._history_index = None + self._history_draft = "" + + def show_previous_history(self) -> bool: + if not self._history: + return False + if self._history_index is None: + self._history_draft = self.text + self._history_index = len(self._history) - 1 + elif self._history_index > 0: + self._history_index -= 1 + self._set_history_text(self._history[self._history_index]) + return True + + def show_next_history(self) -> bool: + if self._history_index is None: + return False + if self._history_index < len(self._history) - 1: + self._history_index += 1 + self._set_history_text(self._history[self._history_index]) + return True + self._history_index = None + self._set_history_text(self._history_draft) + self._history_draft = "" + return True + + def _set_history_text(self, value: str) -> None: + self.text = value + lines = value.split("\n") + try: + self.move_cursor((len(lines) - 1, len(lines[-1]))) + except Exception: + pass diff --git a/frontends/tuiapp.py b/frontends/tuiapp.py index 07345d2c..8f88e44f 100644 --- a/frontends/tuiapp.py +++ b/frontends/tuiapp.py @@ -50,6 +50,8 @@ AgentFactory = Callable[[], Any] +from tui_input_history import InputHistoryMixin + @dataclass class ChatMessage: @@ -161,7 +163,7 @@ def default_agent_factory() -> Any: return agent -class PromptInput(TextArea): +class PromptInput(InputHistoryMixin, TextArea): """Multi-line input: Enter submits, Ctrl+Enter (ctrl+j) inserts newline, paste never auto-submits.""" BINDINGS = [ @@ -170,21 +172,29 @@ class PromptInput(TextArea): class Submitted(Message): """Posted when the user presses Enter to submit.""" - def __init__(self, value: str) -> None: + def __init__(self, input_area: "PromptInput", value: str) -> None: super().__init__() + self.input_area = input_area self.value = value def __init__(self, placeholder: str = "", **kwargs) -> None: super().__init__(language=None, show_line_numbers=False, compact=True, placeholder=placeholder, **kwargs) + self._init_input_history() def _on_key(self, event: events.Key) -> None: - if event.key == "enter": + if event.key == "up" and self.show_previous_history(): + event.stop() + event.prevent_default() + elif event.key == "down" and self.show_next_history(): + event.stop() + event.prevent_default() + elif event.key == "enter": # Enter → submit event.stop() event.prevent_default() value = self.text.rstrip() self.clear() - self.post_message(self.Submitted(value)) + self.post_message(self.Submitted(self, value)) elif event.key == "ctrl+j": # Ctrl+Enter (ctrl+j) → insert newline event.stop() @@ -307,6 +317,7 @@ def on_prompt_input_submitted(self, event: PromptInput.Submitted) -> None: if not value: self._system("Empty input ignored. Type /help for commands.") return + event.input_area.add_history(value) parsed = parse_local_command(value) if parsed: cmd, args = parsed diff --git a/frontends/tuiapp_v2.py b/frontends/tuiapp_v2.py index d95d8667..f89983b1 100644 --- a/frontends/tuiapp_v2.py +++ b/frontends/tuiapp_v2.py @@ -111,9 +111,17 @@ def _soft_to_hard(tokens): # Side-effect imports: activate /btw + /continue monkey-patches on GenericAgent import chatapp_common # noqa: F401 +from tui_input_history import InputHistoryMixin +from llmcore import reload_mykeys from chatapp_common import format_restore from btw_cmd import handle_frontend_command as btw_handle -from continue_cmd import handle_frontend_command as continue_handle, list_sessions as continue_list, extract_ui_messages as continue_extract +from continue_cmd import ( + handle_frontend_command as continue_handle, + list_sessions as continue_list, + extract_ui_messages as continue_extract, + reset_conversation as continue_reset, + restore as continue_restore, +) from export_cmd import last_assistant_text, export_to_temp, wrap_for_clipboard AgentFactory = Callable[[], Any] @@ -220,7 +228,7 @@ def get_selection(self, selection): return selection.extract("\n".join(lines)), "\n" -class InputArea(TextArea): +class InputArea(InputHistoryMixin, TextArea): """多行输入框:Enter 发送 / Ctrl+J 等换行 / 粘贴 >2 行收为 [Pasted text #N +M lines]。""" _PASTE_RE = re.compile(r'\[Pasted text #(\d+) \+\d+ lines\]') @@ -247,6 +255,7 @@ def __init__(self, *args, **kwargs) -> None: super().__init__(*args, **kwargs) self._pastes: dict[int, str] = {} self._paste_counter = 0 + self._init_input_history() def expand_placeholders(self, text: str) -> str: def repl(m): @@ -292,6 +301,10 @@ async def _on_key(self, event: events.Key) -> None: fn = routes.get(event.key) if fn: fn(); event.stop(); event.prevent_default(); return + if event.key == "up" and self.show_previous_history(): + event.stop(); event.prevent_default(); return + if event.key == "down" and self.show_next_history(): + event.stop(); event.prevent_default(); return if event.key == "enter": # 换行键已被 BINDINGS 拦走 event.stop(); event.prevent_default() self.post_message(self.Submitted(self, self.text)) @@ -362,6 +375,14 @@ def _truncate(text: str, max_w: int) -> str: return "".join(out) +def _rel_time(mtime: float) -> str: + d = int(time.time() - mtime) + if d < 60: return f"{d}秒前" + if d < 3600: return f"{d // 60}分前" + if d < 86400: return f"{d // 3600}小时前" + return f"{d // 86400}天前" + + def _sidebar_last_user(sess: AgentSession) -> str: for m in reversed(sess.messages): if m.role == "user": @@ -379,7 +400,46 @@ def _sidebar_last_summary(sess: AgentSession) -> str: return "" -def render_sidebar(sessions: dict[int, AgentSession], current_id: Optional[int]) -> Table: +def _session_rows(sess: AgentSession) -> int: + rows = 3 + if _sidebar_last_user(sess): rows += 1 + if _sidebar_last_summary(sess): rows += 1 + return rows + + +def _tui_recent_sessions_limit(mykeys: Optional[dict[str, Any]] = None) -> int: + if mykeys is None: + try: mykeys = reload_mykeys()[0] + except Exception: mykeys = {} + try: limit = int((mykeys or {}).get("tui_recent_sessions_limit", 10)) + except Exception: limit = 10 + return limit if limit > 0 else 10 + + +def _recent_sidebar_sessions(sessions, limit: int): + return list(sessions or [])[:max(0, int(limit or 0))] + + +def _clamp_sidebar_width(width: int, screen_width: int) -> int: + max_width = min(70, max(24, int(screen_width) - 40)) + return max(24, min(int(width), max_width)) + + +def _is_sidebar_resizer_hit(screen_x: int, boundary_x: int) -> bool: + return int(screen_x) in {int(boundary_x) - 1, int(boundary_x), int(boundary_x) + 1} + + +def _recent_preview_width(sidebar_width: int) -> int: + # Sidebar padding + index/status columns consume roughly 18 cells. + return max(12, int(sidebar_width) - 22) + + +def render_sidebar( + sessions: dict[int, AgentSession], + current_id: Optional[int], + recent_sessions=None, + width: int = 34, +) -> Table: outer = Table.grid(expand=True) outer.add_column() @@ -414,6 +474,28 @@ def preview(label, txt, style): outer.add_row(Text("SESSIONS", style=f"bold {C_DIM}")) outer.add_row(Text("")) outer.add_row(sess_tbl) + recent = list(recent_sessions or []) + if recent: + preview_width = _recent_preview_width(width) + recent_tbl = Table.grid(expand=True) + recent_tbl.add_column(width=2) + recent_tbl.add_column(width=2) + recent_tbl.add_column(ratio=1, no_wrap=True, overflow="ellipsis") + recent_tbl.add_column(justify="right") + recent_tbl.add_column(width=2) + for idx, (_, mtime, first, n) in enumerate(recent, 1): + preview_text = re.sub(r"\s+", " ", first or "(无法预览)").strip() + recent_tbl.add_row( + blank, + Text(str(idx), style=C_DIM), + Text(_truncate(preview_text, preview_width), style=C_MUTED), + Text(f"{_rel_time(mtime)} · {n}轮", style=C_DIM), + blank, + ) + outer.add_row(Text("")) + outer.add_row(Text("RECENT", style=f"bold {C_DIM}")) + outer.add_row(Text("")) + outer.add_row(recent_tbl) return outer @@ -437,9 +519,18 @@ class GenericAgentTUI(App[None]): height: 100%; background: #0d1117; padding: 1 2; - border-right: solid #21262d; + border-right: solid #30363d; } #sidebar.-hidden, #sidebar.-narrow { display: none; } + #sidebar-resizer { + width: 1; + height: 100%; + background: #0d1117; + } + #sidebar-resizer:hover { + background: #161b22; + } + #sidebar-resizer.-hidden, #sidebar-resizer.-narrow { display: none; } #main { height: 100%; @@ -535,11 +626,16 @@ def __init__(self, agent_factory: Optional[AgentFactory] = None) -> None: self._suppress_palette_open = False # 选中 option 后抑制下一次 on_input_changed 重开 palette self.fold_mode: bool = True # 折叠已完成的 turn,Ctrl+F 切 self._last_width: int = -1 # 轮询时检测变化用(Windows 窗口吸附/全屏不发 resize 时的兜底) + self._recent_sessions_limit: int = _tui_recent_sessions_limit() + self._recent_sessions: list = [] + self._sidebar_width: Optional[int] = None + self._resizing_sidebar: bool = False def compose(self) -> ComposeResult: yield Static("", id="topbar") with Horizontal(id="body"): yield Static("", id="sidebar") + yield Static("", id="sidebar-resizer") with Vertical(id="main"): yield VerticalScroll(id="messages") yield OptionList(id="palette") @@ -572,9 +668,12 @@ def _tick(self) -> None: def _patch_auto_scroll_for_selection(self) -> None: """让选区拖拽到 #input 上时仍能滚动 #messages:把 _select_start 也当候选源,鼠标在 scrollable 下/上方也触发。""" - from textual._auto_scroll import get_auto_scroll_regions - from textual.geometry import Offset - from textual.widget import Widget as _W + try: + from textual._auto_scroll import get_auto_scroll_regions + from textual.geometry import Offset + from textual.widget import Widget as _W + except ModuleNotFoundError: + return screen = self.screen app = self @@ -672,6 +771,7 @@ def action_stop_current(self) -> None: def action_toggle_sidebar(self) -> None: self.query_one("#sidebar", Static).toggle_class("-hidden") + self.query_one("#sidebar-resizer", Static).toggle_class("-hidden") def action_toggle_fold(self) -> None: self.fold_mode = not self.fold_mode @@ -702,7 +802,7 @@ def action_complete_command(self) -> None: palette.action_select() def on_click(self, event: events.Click) -> None: - """点击侧栏会话条目 → 切换。""" + """点击侧栏会话条目 → 切换;点击 RECENT 条目 → 恢复历史会话。""" try: sidebar = self.query_one("#sidebar", Static) except Exception: @@ -715,15 +815,57 @@ def on_click(self, event: events.Click) -> None: if y < 0: return for sid, sess in self.sessions.items(): - rows = 3 # top spacer + name + bottom spacer - if _sidebar_last_user(sess): rows += 1 - if _sidebar_last_summary(sess): rows += 1 + rows = _session_rows(sess) if y < rows: if sid != self.current_id: self.current_id = sid self._refresh_all() return y -= rows + # RECENT 区块:空行 + "RECENT" + 空行,然后每条历史 1 行。 + y -= 3 + if 0 <= y < len(self._recent_sessions): + self._restore_recent_session(y) + + def on_mouse_down(self, event: events.MouseDown) -> None: + if getattr(event, "button", None) != 1: + return + try: + sidebar = self.query_one("#sidebar", Static) + resizer = self.query_one("#sidebar-resizer", Static) + except Exception: + return + boundary_x = int(getattr(resizer.region, "x", 0) or (sidebar.region.x + sidebar.region.width)) + if event.widget is not resizer and not _is_sidebar_resizer_hit(event.screen_x, boundary_x): + return + self._resizing_sidebar = True + event.stop() + event.prevent_default() + + def on_mouse_move(self, event: events.MouseMove) -> None: + if not self._resizing_sidebar: + return + try: + sidebar = self.query_one("#sidebar", Static) + origin_x = int(sidebar.region.x) + except Exception: + origin_x = 0 + width = _clamp_sidebar_width(int(event.screen_x) - origin_x, self.size.width) + self._sidebar_width = width + try: + self.query_one("#sidebar", Static).styles.width = width + except Exception: + pass + self._remount_current_session() + event.stop() + event.prevent_default() + + def on_mouse_up(self, event: events.MouseUp) -> None: + if not self._resizing_sidebar: + return + self._resizing_sidebar = False + event.stop() + event.prevent_default() # ---------------- input + palette ---------------- def on_resize(self, event) -> None: @@ -735,6 +877,7 @@ def _apply_responsive_layout(self) -> None: """按终端宽度调侧栏宽 + 主区横向 padding。<70 列隐藏侧栏,宽屏按比例放大。""" try: sidebar = self.query_one("#sidebar", Static) + resizer = self.query_one("#sidebar-resizer", Static) main = self.query_one("#main", Vertical) except Exception: return @@ -743,9 +886,12 @@ def _apply_responsive_layout(self) -> None: # 自动隐藏走 -narrow 类,跟用户手动 Ctrl+B 切的 -hidden 互不干扰 if w < 70: sidebar.add_class("-narrow") + resizer.add_class("-narrow") else: sidebar.remove_class("-narrow") - sidebar.styles.width = max(30, min(50, w // 5)) + resizer.remove_class("-narrow") + default_width = max(30, min(50, w // 5)) + sidebar.styles.width = _clamp_sidebar_width(self._sidebar_width or default_width, w) main.styles.padding = (1, 2) if w < 90 else (1, 6) self._remount_current_session() # 宽度变了 → markdown 要按新宽重渲 @@ -802,6 +948,7 @@ def on_input_area_submitted(self, event: "InputArea.Submitted") -> None: self._resize_input(inp) if not text: return + inp.add_history(text) if text.startswith("/"): parts = text.split(maxsplit=1) cmd = parts[0][1:].lower() @@ -1097,6 +1244,24 @@ def _cmd_continue(self, args, raw): self._system(result) self._refresh_all() + def _restore_recent_session(self, idx: int) -> None: + if not (0 <= idx < len(self._recent_sessions)): + return + path = self._recent_sessions[idx][0] + sess = self.current + try: + continue_reset(sess.agent, message=None) + result, _ = continue_restore(sess.agent, path) + except Exception as e: + result = f"❌ /continue 失败: {e}" + if result.startswith("✅"): + sess.messages.clear() + for h in continue_extract(path): + sess.messages.append(ChatMessage(role=h["role"], content=h["content"])) + self._remount_current_session() + self._system(result) + self._refresh_all() + def _cmd_export(self, args): sess = self.current sub = args[0].lower() if args else "" @@ -1242,7 +1407,18 @@ def _refresh_topbar(self): def _refresh_sidebar(self): if not self.is_mounted: return - self.query_one("#sidebar", Static).update(render_sidebar(self.sessions, self.current_id)) + try: + recent = continue_list(exclude_pid=os.getpid()) + except Exception: + recent = [] + self._recent_sessions = _recent_sidebar_sessions(recent, self._recent_sessions_limit) + try: + sidebar_width = int(self.query_one("#sidebar", Static).region.width) + except Exception: + sidebar_width = self._sidebar_width or 34 + self.query_one("#sidebar", Static).update( + render_sidebar(self.sessions, self.current_id, self._recent_sessions, width=sidebar_width) + ) def _refresh_messages(self): if not self.is_mounted or self.current_id is None: return diff --git a/ga.py b/ga.py index f040cf08..ed65443d 100644 --- a/ga.py +++ b/ga.py @@ -516,6 +516,32 @@ def do_start_long_term_update(self, args, response): else: result = "Memory Management SOP not found. Do not update memory." return StepOutcome(result, next_prompt=prompt) + def do_codex_lesson_update(self, args, response): + '''记录从Codex蒸馏packet中由LLM提议的候选经验;工具负责校验、去重和候选落盘。''' + try: + from memory.codex_session_distill import DistillState, codex_lesson_update + except Exception: + import codex_session_distill + DistillState = codex_session_distill.DistillState + codex_lesson_update = codex_session_distill.codex_lesson_update + state_dir = args.get("state_dir") or os.path.join(script_dir, "memory", "codex_distill") + evidence = args.get("evidence") or args.get("evidence_signals") or [] + if isinstance(evidence, str): evidence = [evidence] + result = codex_lesson_update( + DistillState(state_dir), + title=args.get("title", ""), + guidance=args.get("guidance", ""), + category=args.get("category", "workflow"), + evidence=evidence, + source_hash=args.get("source_hash", ""), + confidence=args.get("confidence", 0.5), + ) + if result.get("status") == "candidate_recorded": + yield f"[Info] Codex lesson candidate recorded: {result['candidate'].get('id')}\n" + else: + yield f"[Warn] Codex lesson candidate rejected: {result.get('reason')}\n" + return StepOutcome(result, next_prompt=self._get_anchor_prompt(skip=args.get('_index', 0) > 0)) + def _fold_earlier(self, lines): FALLBACK = '直接回答了用户问题' parts, cnt, last = [], 0, '' diff --git a/llmcore.py b/llmcore.py index bf239e7e..0b2dfdcf 100644 --- a/llmcore.py +++ b/llmcore.py @@ -500,7 +500,11 @@ def _msgs_claude2oai(messages): text_parts.append({"type": "image_url", "image_url": {"url": f"data:{src.get('media_type', 'image/png')};base64,{src.get('data', '')}"}}) elif b.get("type") == "image_url": text_parts.append(b) elif b.get("type") == "text" and b.get("text"): text_parts.append({"type": "text", "text": b.get("text", "")}) - if text_parts: result.append({"role": "user", "content": text_parts}) + if text_parts: + if all(p.get("type") == "text" for p in text_parts): + result.append({"role": "user", "content": "\n".join(p.get("text", "") for p in text_parts)}) + else: + result.append({"role": "user", "content": text_parts}) else: result.append(msg) return result @@ -696,6 +700,11 @@ def ask(self, msg): return MockResponse(thinking, content, tool_calls, str(content_blocks)) class NativeOAISession(NativeClaudeSession): + def __init__(self, cfg): + super().__init__(cfg) + self.native_image_input = bool(cfg.get("native_image_input", False)) + self.native_tools = bool(cfg.get("native_tools", True)) + def raw_ask(self, messages): messages = _fix_messages(messages) messages = _ensure_thinking_blocks(messages, self.model) @@ -969,19 +978,27 @@ def __init__(self, backend): self.backend.system = self._thinking_prompt() self.name = self.backend.name self._pending_tool_ids = [] + self.last_tools = '' self.log_path = None - def set_system(self, extra_system): + def _uses_native_tools(self): + return bool(getattr(self.backend, 'native_tools', True)) + def _text_tool_instruction(self, tools): + return ToolClient(self.backend, auto_save_tokens=False)._prepare_tool_instruction(tools) + def set_system(self, extra_system, tools=None): combined = f"{extra_system}\n\n{self._thinking_prompt()}" if extra_system else self._thinking_prompt() + if tools and not self._uses_native_tools(): + combined = f"{combined}\n\n{self._text_tool_instruction(tools)}" if combined != self.backend.system: print(f"[Debug] Updated system prompt, length {len(combined)} chars.") self.backend.system = combined def chat(self, messages, tools=None): - if tools: self.backend.tools = tools + if tools and self._uses_native_tools(): self.backend.tools = tools + else: self.backend.tools = None if not self.backend.history: self._pending_tool_ids = [] combined_content = []; resp = None; tool_results = [] for msg in messages: c = msg.get('content', '') if msg['role'] == 'system': - self.set_system(c); continue + self.set_system(c, tools); continue if isinstance(c, str): combined_content.append({"type": "text", "text": c}) elif isinstance(c, list): combined_content.extend(c) if msg['role'] == 'user' and msg.get('tool_results'): tool_results.extend(msg['tool_results']) @@ -994,8 +1011,12 @@ def chat(self, messages, tools=None): for tid in self._pending_tool_ids: if tid not in tr_id_set: tool_result_blocks.append({"type": "tool_result", "tool_use_id": tid, "content": ""}) self._pending_tool_ids = [] - # Filter whitespace-only text blocks that cause 400 on strict API proxies - filtered_content = [c for c in combined_content if c.get("text", "").strip()] + # Filter whitespace-only text blocks that cause 400 on strict API proxies, + # but keep native multimodal blocks such as image/image_url. + filtered_content = [ + c for c in combined_content + if c.get("type") != "text" or c.get("text", "").strip() + ] final_content = tool_result_blocks + filtered_content if not final_content: final_content = [{"type": "text", "text": "."}] merged = {"role": "user", "content": final_content} diff --git a/memory/codex_coding_sop.md b/memory/codex_coding_sop.md new file mode 100644 index 00000000..20589b3d --- /dev/null +++ b/memory/codex_coding_sop.md @@ -0,0 +1,36 @@ +# Codex Coding SOP + +来源:由 `codex_session_distill.py` 从本机 Codex JSONL 会话生成脱敏证据包,再由 LLM 读取 packet 并通过 `codex_lesson_update` 提议、校验、晋升。本文只保留跨项目编码工作法,不保存原始对话、密钥、私有路径或一次性业务事实。 + +## 使用原则 +- 先按当前仓库规范执行;本 SOP 只提供编码协作习惯和避坑策略。 +- 经验应来自 LLM 对脱敏 packet 的分析;规则命中的 seed observation 只作为证据提示,不能单独成为正式经验。 +- 经验有独立证据计数,证据越多优先级越高;单次会话经验只作为弱提示。 +- 若本 SOP 与项目 AGENTS/CONTRIBUTING/用户指令冲突,以上游明确指令为准。 + +## debugging +### Recover from failures by adding information +- 做法:On failure, read the error, gather new state, then change strategy; avoid repeating the same command without new evidence. +- 证据: 12 | signals: failure, recovery + +## testing +### Verify changed behavior before claiming completion +- 做法:After edits, run the focused test or project verification command and report failures instead of assuming success. +- 证据: 9 | signals: patch, verification, verification_success + +## workflow +### Probe repository facts before editing +- 做法:For coding work, inspect project rules, status, and nearby code before making the smallest viable patch. +- 证据: 15 | signals: fast_search, git_status, patch, repo_probe + +### Use fast text search to orient in code +- 做法:Reach for precise repository search before broad directory traversal when locating files, symbols, or tests. +- 证据: 12 | signals: fast_search + +### Distill pipeline has dual-channel lesson extraction +- 做法:codex distill has two independent pipelines: (1) rule-based extract_session_packet() scans for 4 hardcoded behavior signals, (2) LLM-proposed codex_lesson_update tool records any pattern the agent deems valuable during live execution. Candidates need evidence_count>=2 or confidence>=0.85 to promote to forma ...[cut] +- 证据: 1 | signals: codex_lesson_update_found_in_ga.py, extract_session_packet_4_patterns_confirmed, promote_threshold_verified + +### Separate extraction, validation, and publication +- 做法:When a workflow is underperforming, split responsibilities into separate stages: one stage gathers/desensitizes evidence, one stage abstracts reusable lessons, and one stage validates/merges/renders the result. Avoid mixing generation and verification in the same step. +- 证据: 1 | signals: packet shows historical distillation became richer after separating脱 ...[cut], packet summary identifies职责放错是根因 rather than tuning thresholds, packet timeline includes brainstorming, TDD, systematic-debugging, v ...[cut] diff --git a/memory/codex_session_distill.py b/memory/codex_session_distill.py new file mode 100644 index 00000000..d0a55495 --- /dev/null +++ b/memory/codex_session_distill.py @@ -0,0 +1,1091 @@ +"""Distill reusable coding lessons from Codex JSONL sessions. + +The tool intentionally stores compressed, redacted lessons instead of raw +conversation text. It is designed for GA memory use: progress is tracked so +old sessions are skipped, while repeated high-value patterns increase lesson +confidence instead of duplicating SOP text. +""" +from __future__ import annotations + +import argparse +import datetime as _dt +import hashlib +import json +import os +import re +import shutil +from dataclasses import dataclass, field +from pathlib import Path +from typing import Iterable + + +DEFAULT_STATE_DIR = Path(__file__).resolve().parent / "codex_distill" +DEFAULT_SOP_PATH = Path(__file__).resolve().parent / "codex_coding_sop.md" +DEFAULT_CODEX_ROOT = Path.home() / ".codex" / "sessions" +DEFAULT_ROOT_SENTINEL = "__AUTO_CODEX_SESSIONS__" +FOCUS_ORDER = ("workflow", "debugging", "testing", "communication") +MAX_SNIPPET = 220 +VALID_CATEGORIES = {"workflow", "debugging", "testing", "git", "security", "frontend", "planning", "communication", "quality"} + +SECRET_PATTERNS = [ + re.compile(r"sk-[A-Za-z0-9_-]{10,}"), + re.compile(r"gh[pousr]_[A-Za-z0-9_]{12,}"), + re.compile(r"(?i)\b(Bearer)\s+[A-Za-z0-9._~+/=-]{8,}"), + re.compile(r"(?i)\b(api[_-]?key|token|authorization|cookie|secret|password)\b\s*[:=]\s*['\"]?[^'\"\s,;]{4,}"), +] +PATH_PATTERNS = [ + re.compile(r"[A-Za-z]:\\Users\\[^\\\s]+(?:\\[^\s\"']*)?"), + re.compile(r"/Users/[^/\s]+(?:/[^\s\"']*)?"), + re.compile(r"/home/[^/\s]+(?:/[^\s\"']*)?"), +] + +TEST_RE = re.compile( + r"\b(pytest|unittest|npm\s+test|pnpm\s+test|yarn\s+test|go\s+test|cargo\s+test|mvn\s+test|gradle\s+test|ruff|mypy|tsc|eslint)\b", + re.I, +) +FAST_SEARCH_RE = re.compile(r"(^|\s)(rg|ripgrep)\s+", re.I) +READ_RE = re.compile(r"\b(Get-Content|type|cat|sed|nl|git\s+show|git\s+diff|git\s+status)\b", re.I) + + +def _utc_now() -> str: + return _dt.datetime.now(_dt.timezone.utc).replace(microsecond=0).isoformat() + + +def redact_text(text: object, max_len: int = MAX_SNIPPET) -> str: + """Remove secrets and user-local paths, then compact long text.""" + if text is None: + return "" + redacted = str(text).replace("\r\n", "\n").replace("\r", "\n") + for pattern in SECRET_PATTERNS: + redacted = pattern.sub("", redacted) + for pattern in PATH_PATTERNS: + redacted = pattern.sub("", redacted) + redacted = re.sub(r"\s+", " ", redacted).strip() + if len(redacted) > max_len: + redacted = redacted[: max_len - 12].rstrip() + " ...[cut]" + return redacted + + +def _load_jsonl(path: Path) -> Iterable[dict]: + with open(path, "r", encoding="utf-8", errors="replace") as f: + for line_no, line in enumerate(f, 1): + line = line.strip() + if not line: + continue + try: + yield json.loads(line) + except json.JSONDecodeError: + yield {"type": "parse_error", "payload": {"line": line_no}} + + +def file_sha256(path: Path) -> str: + h = hashlib.sha256() + with open(path, "rb") as f: + for chunk in iter(lambda: f.read(1024 * 1024), b""): + h.update(chunk) + return h.hexdigest() + + +def _dedupe_existing_dirs(paths: Iterable[Path]) -> list[Path]: + seen = set() + result = [] + for path in paths: + try: + resolved = path.expanduser().resolve() + except OSError: + resolved = path.expanduser().absolute() + key = str(resolved).lower() if os.name == "nt" else str(resolved) + if key in seen or not resolved.is_dir(): + continue + seen.add(key) + result.append(resolved) + return result + + +def discover_codex_session_roots( + home: Path | str | None = None, + appdata: Path | str | None = None, + localappdata: Path | str | None = None, + cwd: Path | str | None = None, + env: dict[str, str] | None = None, + max_cwd_parent_depth: int = 4, +) -> list[Path]: + """Find likely Codex session directories across machines. + + Search is conservative and only returns existing directories. The explicit + CODEX_SESSIONS_DIR environment variable wins, then common per-user paths, + then .codex/sessions found while walking upward from the current workspace. + """ + env = os.environ if env is None else env + candidates: list[Path] = [] + if env.get("CODEX_SESSIONS_DIR"): + candidates.append(Path(env["CODEX_SESSIONS_DIR"])) + if env.get("CODEX_HOME"): + candidates.append(Path(env["CODEX_HOME"]) / "sessions") + + home_path = Path(home) if home is not None else Path.home() + candidates.append(home_path / ".codex" / "sessions") + + appdata_value = appdata if appdata is not None else env.get("APPDATA") + localappdata_value = localappdata if localappdata is not None else env.get("LOCALAPPDATA") + for base in (appdata_value, localappdata_value): + if base: + base_path = Path(base) + candidates.extend([ + base_path / "Codex" / "sessions", + base_path / ".codex" / "sessions", + ]) + + cwd_path = Path(cwd) if cwd is not None else Path.cwd() + for parent in [cwd_path, *list(cwd_path.parents)[:max_cwd_parent_depth]]: + candidates.append(parent / ".codex" / "sessions") + + return _dedupe_existing_dirs(candidates) + + +def _parse_arguments(raw: object) -> dict: + if isinstance(raw, dict): + return raw + if not isinstance(raw, str) or not raw.strip(): + return {} + try: + data = json.loads(raw) + return data if isinstance(data, dict) else {} + except json.JSONDecodeError: + return {"raw": raw} + + +def _payload_text(payload: dict) -> str: + if not isinstance(payload, dict): + return "" + if payload.get("message"): + return str(payload.get("message")) + content = payload.get("content") + if isinstance(content, str): + return content + if isinstance(content, list): + parts = [] + for block in content: + if isinstance(block, dict): + parts.append(str(block.get("text") or block.get("content") or "")) + return "\n".join(p for p in parts if p) + return "" + + +def _command_signal(command: str) -> set[str]: + signals = set() + if FAST_SEARCH_RE.search(command): + signals.add("fast_search") + if TEST_RE.search(command): + signals.add("verification") + if READ_RE.search(command): + signals.add("repo_probe") + if "git status" in command.lower(): + signals.add("git_status") + return signals + + +def _lesson(lesson_id: str, category: str, title: str, guidance: str, signals: Iterable[str]) -> dict: + return { + "id": lesson_id, + "category": category, + "title": title, + "guidance": guidance, + "signals": sorted(set(signals)), + } + + +def _build_llm_distill_prompt(session_hash: str, focus: str = "workflow") -> str: + source = session_hash[:16] if session_hash else "" + return "\n".join([ + "Read this redacted Codex session packet as reusable coding experience, not as raw memory.", + "Your job is to discover 0-3 cross-project lessons that are richer than the deterministic seed observations.", + "For each useful lesson, call `codex_lesson_update` with a short title, concrete reusable guidance, category, evidence signals, source_hash, and confidence.", + "Use only evidence visible in this packet. Do not store private paths, raw logs, secrets, project-specific facts, or one-off task details.", + "If the packet only supports the seed observations and no deeper reusable pattern, record nothing.", + f"Recommended source_hash: {source}", + f"Current focus: {focus}", + ]) + + +@dataclass +class SessionPacket: + session_hash: str + source: str + size: int + mtime: str + cwd: str = "" + user_goals: list[str] = field(default_factory=list) + tool_counts: dict[str, int] = field(default_factory=dict) + signals: list[str] = field(default_factory=list) + timeline: list[dict] = field(default_factory=list) + verification_commands: list[str] = field(default_factory=list) + failure_recovery: list[dict] = field(default_factory=list) + seed_observations: list[dict] = field(default_factory=list) + lessons: list[dict] = field(default_factory=list) + quality: float = 0.0 + focus: str = "workflow" + llm_distill_prompt: str = "" + created_at: str = field(default_factory=_utc_now) + + def to_dict(self) -> dict: + return { + "session_hash": self.session_hash, + "source": self.source, + "size": self.size, + "mtime": self.mtime, + "cwd": self.cwd, + "user_goals": self.user_goals, + "tool_counts": self.tool_counts, + "signals": self.signals, + "timeline": self.timeline, + "verification_commands": self.verification_commands, + "failure_recovery": self.failure_recovery, + "seed_observations": self.seed_observations, + "lessons": self.lessons, + "quality": self.quality, + "focus": self.focus, + "llm_distill_prompt": self.llm_distill_prompt, + "created_at": self.created_at, + } + + def to_markdown(self) -> str: + lines = [ + f"# Codex Session Packet {self.session_hash[:12]}", + "", + f"- Source: {redact_text(self.source, 300)}", + f"- Quality: {self.quality:.2f}", + f"- Focus: {self.focus}", + f"- CWD: {redact_text(self.cwd, 160) or '(unknown)'}", + f"- Signals: {', '.join(self.signals) or '(none)'}", + "", + "## User Goals", + ] + lines.extend(f"- {redact_text(goal, 180)}" for goal in self.user_goals[:5]) + if not self.user_goals: + lines.append("- (none captured)") + + lines += ["", "## Timeline"] + if self.timeline: + for item in self.timeline[:40]: + command = item.get("command") + summary = item.get("summary") + bits = [f"- {item.get('kind', 'event')}"] + if item.get("tool"): + bits.append(f"tool={item.get('tool')}") + if command: + bits.append(f"command=`{redact_text(command, 180)}`") + if "ok" in item and item.get("ok") is not None: + bits.append(f"ok={item.get('ok')}") + if item.get("signals"): + bits.append(f"signals={','.join(item.get('signals', []))}") + if summary: + bits.append(f"summary={redact_text(summary, 180)}") + lines.append(" | ".join(bits)) + else: + lines.append("- (none captured)") + + lines += ["", "## Verification Commands"] + if self.verification_commands: + lines.extend(f"- `{redact_text(cmd, 180)}`" for cmd in self.verification_commands[:12]) + else: + lines.append("- (none captured)") + + lines += ["", "## Failure Recovery"] + if self.failure_recovery: + for item in self.failure_recovery[:8]: + failed = item.get("failed") or {} + recovered = item.get("recovered_with") or {} + lines.append( + "- failed " + f"`{redact_text(failed.get('command') or failed.get('tool'), 120)}` " + "then recovered with " + f"`{redact_text(recovered.get('command') or recovered.get('tool'), 120)}`" + ) + else: + lines.append("- (none captured)") + + lines += [ + "", + "## Seed Observations", + "These deterministic hints are evidence for the LLM. They are not approved lessons by themselves.", + ] + for item in self.seed_observations: + lines += [ + f"### {redact_text(item.get('title'), 160)}", + f"- ID: {item.get('id', '')}", + f"- Category: {item.get('category', '')}", + f"- Guidance: {redact_text(item.get('guidance'), 260)}", + f"- Evidence signals: {', '.join(item.get('signals', []))}", + "", + ] + if not self.seed_observations: + lines.append("- (none captured)") + + lines += ["", "## LLM Distillation Task", self.llm_distill_prompt or _build_llm_distill_prompt(self.session_hash, self.focus)] + return "\n".join(lines).rstrip() + "\n" + + +class DistillState: + def __init__(self, root: Path | str = DEFAULT_STATE_DIR): + self.root = Path(root) + self.progress_path = self.root / "progress.json" + self.lessons_path = self.root / "lessons.jsonl" + self.candidates_path = self.root / "candidate_lessons.jsonl" + self.queue_dir = self.root / "queue" + self.learned_dir = self.root / "learned_packets" + + def ensure(self) -> None: + self.root.mkdir(parents=True, exist_ok=True) + self.queue_dir.mkdir(parents=True, exist_ok=True) + self.learned_dir.mkdir(parents=True, exist_ok=True) + + def load_progress(self) -> dict: + if not self.progress_path.exists(): + return {"version": 1, "sessions": {}} + try: + with open(self.progress_path, "r", encoding="utf-8") as f: + data = json.load(f) + if isinstance(data, dict) and isinstance(data.get("sessions"), dict): + return data + except (OSError, json.JSONDecodeError): + pass + return {"version": 1, "sessions": {}} + + def save_progress(self, progress: dict) -> None: + self.ensure() + tmp = self.progress_path.with_suffix(".tmp") + with open(tmp, "w", encoding="utf-8") as f: + json.dump(progress, f, ensure_ascii=False, indent=2) + os.replace(tmp, self.progress_path) + + +def _choose_focus(record: dict | None = None) -> str: + done = set((record or {}).get("focus_done") or []) + for focus in FOCUS_ORDER: + if focus not in done: + return focus + return FOCUS_ORDER[0] + + +def _slug(text: str, fallback: str = "lesson") -> str: + text = redact_text(text, 120).lower() + words = re.findall(r"[a-z0-9]+", text) + if not words: + # Keep common Chinese candidate titles deterministic enough without + # relying on transliteration packages. + if "未提交" in text or "worktree" in text: + words = ["protect", "user", "worktree"] + elif "验证" in text: + words = ["verify", "before", "done"] + elif "搜索" in text: + words = ["fast", "search"] + return "_".join(words[:5]) or fallback + + +def _has_sensitive_text(*values: object) -> bool: + raw = "\n".join(str(v or "") for v in values) + if any(pattern.search(raw) for pattern in SECRET_PATTERNS + PATH_PATTERNS): + return True + lowered = raw.lower() + return ".env" in lowered or "private key" in lowered or "-----begin" in lowered + + +def _candidate_id(category: str, title: str) -> str: + category = _slug(category, "workflow") + if "未提交" in title or "worktree" in title.lower(): + return f"{category}_protect_user_worktree" + return f"{category}_{_slug(title, 'lesson')}" + + +def _load_jsonl_records(path: Path) -> list[dict]: + if not path.exists(): + return [] + records = [] + with open(path, "r", encoding="utf-8", errors="replace") as f: + for line in f: + line = line.strip() + if not line: + continue + try: + item = json.loads(line) + except json.JSONDecodeError: + continue + if isinstance(item, dict): + records.append(item) + return records + + +def _source_prefix(source_hash: str) -> str: + return redact_text(source_hash, 80)[:16] if source_hash else "" + + +def _candidate_evidence_count(candidate: dict) -> int: + sources = [s for s in candidate.get("sources", []) if s] + if sources: + return len(set(sources)) + return int(candidate.get("evidence_count", 0)) + + +def codex_lesson_update( + state: DistillState | None = None, + *, + title: str, + guidance: str, + category: str = "workflow", + evidence: Iterable[str] | None = None, + source_hash: str = "", + confidence: float = 0.5, +) -> dict: + """Record an LLM-proposed candidate lesson after deterministic checks.""" + state = state or DistillState() + state.ensure() + if _has_sensitive_text(title, guidance, category, " ".join(str(x) for x in (evidence or []))): + return {"status": "rejected", "reason": "sensitive_content_detected"} + title = redact_text(title, 120) + guidance = redact_text(guidance, 320) + category = _slug(category, "workflow") + if category not in VALID_CATEGORIES: + category = "workflow" + evidence = [redact_text(x, 80) for x in (evidence or []) if redact_text(x, 80)] + try: + confidence = max(0.0, min(1.0, float(confidence))) + except (TypeError, ValueError): + confidence = 0.5 + if not title or len(title) < 4: + return {"status": "rejected", "reason": "title_too_short"} + if not guidance or len(guidance) < 12: + return {"status": "rejected", "reason": "guidance_too_short"} + lesson_id = _candidate_id(category, title) + candidates = {item.get("id"): item for item in _load_jsonl_records(state.candidates_path) if item.get("id")} + current = candidates.get(lesson_id) + if current is None: + current = { + "id": lesson_id, + "category": category, + "title": title, + "guidance": guidance, + "evidence": [], + "evidence_count": 0, + "confidence_max": 0.0, + "sources": [], + "created_at": _utc_now(), + } + candidates[lesson_id] = current + current["evidence"] = sorted(set(current.get("evidence", [])) | set(evidence)) + current["confidence_max"] = max(float(current.get("confidence_max", 0.0)), confidence) + source_prefix = _source_prefix(source_hash) + if source_prefix and source_prefix not in current.get("sources", []): + current.setdefault("sources", []).append(source_prefix) + if source_prefix: + current["evidence_count"] = len(set(current.get("sources", []))) + else: + current["evidence_count"] = max(1, int(current.get("evidence_count", 0))) + current["updated_at"] = _utc_now() + + with open(state.candidates_path, "w", encoding="utf-8", newline="\n") as f: + for item in sorted(candidates.values(), key=lambda x: (-_candidate_evidence_count(x), x.get("id", ""))): + f.write(json.dumps(item, ensure_ascii=False, sort_keys=True) + "\n") + return {"status": "candidate_recorded", "candidate": current} + + +def extract_session_packet(path: Path | str, focus: str = "workflow") -> SessionPacket: + path = Path(path) + session_hash = file_sha256(path) + stat = path.stat() + mtime = _dt.datetime.fromtimestamp(stat.st_mtime).replace(microsecond=0).isoformat() + + cwd = "" + user_goals: list[str] = [] + tool_counts: dict[str, int] = {} + signals: set[str] = set() + tool_events: list[dict] = [] + call_to_event: dict[str, dict] = {} + timeline: list[dict] = [] + verification_commands: list[str] = [] + failure_recovery: list[dict] = [] + last_failed_event: dict | None = None + patch_seen = False + verification_seen = False + verification_success = False + probe_before_patch = False + failed_then_success = False + saw_failure = False + + for row in _load_jsonl(path): + top_type = row.get("type") + payload = row.get("payload") or {} + if top_type == "session_meta": + cwd = redact_text(payload.get("cwd"), 200) + if top_type == "event_msg": + text = _payload_text(payload) + if text and len(user_goals) < 6: + user_goals.append(redact_text(text, 180)) + if top_type != "response_item" or not isinstance(payload, dict): + continue + + ptype = payload.get("type") + if ptype == "message": + if payload.get("role") == "user" and len(user_goals) < 6: + text = _payload_text(payload) + if text: + user_goals.append(redact_text(text, 180)) + continue + if ptype not in {"function_call", "custom_tool_call", "web_search_call", "function_call_output", "custom_tool_call_output"}: + continue + + if ptype in {"function_call", "custom_tool_call", "web_search_call"}: + name = str(payload.get("name") or ptype) + tool_counts[name] = tool_counts.get(name, 0) + 1 + args = _parse_arguments(payload.get("arguments")) + command = str(args.get("command") or args.get("cmd") or args.get("script") or "") + event = { + "name": name, + "command": redact_text(command, 300), + "signals": sorted(_command_signal(command)), + "ok": None, + } + timeline_event = { + "kind": "tool_call", + "tool": name, + "command": event["command"], + "signals": event["signals"], + "ok": None, + } + event["timeline_index"] = len(timeline) + timeline.append(timeline_event) + call_id = payload.get("call_id") + if call_id: + call_to_event[str(call_id)] = event + tool_events.append(event) + signals.update(event["signals"]) + if name == "apply_patch" or "patch" in name: + patch_seen = True + signals.add("patch") + if any("repo_probe" in set(e.get("signals", [])) or "fast_search" in set(e.get("signals", [])) for e in tool_events[:-1]): + probe_before_patch = True + if "verification" in event["signals"]: + verification_seen = True + if event["command"] and event["command"] not in verification_commands: + verification_commands.append(event["command"]) + continue + + output = str(payload.get("output") or "") + event = call_to_event.get(str(payload.get("call_id") or "")) + if event is None and tool_events: + event = next((e for e in reversed(tool_events) if e.get("ok") is None), None) + if event is not None: + ok = "Exit code: 0" in output or "\nOK" in output or "passed" in output.lower() + failed = "Exit code: 1" in output or "FAILED" in output or "Traceback" in output + event["ok"] = ok if ok or failed else event.get("ok") + idx = event.get("timeline_index") + if isinstance(idx, int) and 0 <= idx < len(timeline): + timeline[idx]["ok"] = event["ok"] + timeline[idx]["summary"] = redact_text(output, 180) + if failed: + saw_failure = True + signals.add("failure") + last_failed_event = { + "tool": event.get("name"), + "command": event.get("command"), + "signals": event.get("signals", []), + } + if ok: + if saw_failure: + failed_then_success = True + signals.add("recovery") + if last_failed_event: + failure_recovery.append({ + "failed": last_failed_event, + "recovered_with": { + "tool": event.get("name"), + "command": event.get("command"), + "signals": event.get("signals", []), + }, + }) + last_failed_event = None + if "verification" in set(event.get("signals", [])): + verification_success = True + + seed_observations = [] + if probe_before_patch or (patch_seen and any(s in signals for s in ("repo_probe", "fast_search"))): + seed_observations.append(_lesson( + "repo_probe_before_edit", + "workflow", + "Probe repository facts before editing", + "For coding work, inspect project rules, status, and nearby code before making the smallest viable patch.", + ["repo_probe", "patch"] + sorted(signals & {"fast_search", "git_status"}), + )) + if "fast_search" in signals: + seed_observations.append(_lesson( + "prefer_fast_text_search", + "workflow", + "Use fast text search to orient in code", + "Reach for precise repository search before broad directory traversal when locating files, symbols, or tests.", + ["fast_search"], + )) + if patch_seen and (verification_success or verification_seen): + seed_observations.append(_lesson( + "verify_changes_before_done", + "testing", + "Verify changed behavior before claiming completion", + "After edits, run the focused test or project verification command and report failures instead of assuming success.", + ["patch", "verification"] + (["verification_success"] if verification_success else []), + )) + if failed_then_success: + seed_observations.append(_lesson( + "failure_recovery_with_new_information", + "debugging", + "Recover from failures by adding information", + "On failure, read the error, gather new state, then change strategy; avoid repeating the same command without new evidence.", + ["failure", "recovery"], + )) + + quality = 0.0 + if patch_seen: + quality += 0.25 + if any(s in signals for s in ("repo_probe", "fast_search")): + quality += 0.20 + if verification_seen: + quality += 0.20 + if verification_success: + quality += 0.15 + if failed_then_success: + quality += 0.15 + if seed_observations: + quality += 0.10 + quality = min(1.0, quality) + + return SessionPacket( + session_hash=session_hash, + source=str(path), + size=stat.st_size, + mtime=mtime, + cwd=cwd, + user_goals=[g for i, g in enumerate(user_goals) if g and g not in user_goals[:i]][:6], + tool_counts=tool_counts, + signals=sorted(signals), + timeline=timeline[:80], + verification_commands=verification_commands[:20], + failure_recovery=failure_recovery[:12], + seed_observations=seed_observations, + lessons=[], + quality=quality, + focus=focus, + llm_distill_prompt=_build_llm_distill_prompt(session_hash, focus), + ) + + +def iter_session_files(roots: Iterable[Path | str]) -> Iterable[Path]: + for root in roots: + root = Path(root).expanduser() + if root.is_file() and root.suffix.lower() == ".jsonl": + yield root + elif root.is_dir(): + yield from sorted(root.rglob("*.jsonl")) + + +def _resolve_roots(roots: Iterable[Path | str] | None) -> list[Path | str]: + roots = list(roots or []) + if not roots or roots == [DEFAULT_ROOT_SENTINEL]: + discovered = discover_codex_session_roots() + return discovered or [DEFAULT_CODEX_ROOT] + return roots + + +def prepare_sessions( + roots: Iterable[Path | str], + state: DistillState | None = None, + limit: int = 3, + min_quality: float = 0.55, +) -> list[SessionPacket]: + state = state or DistillState() + roots = _resolve_roots(roots) + state.ensure() + progress = state.load_progress() + prepared: list[SessionPacket] = [] + + for path in iter_session_files(roots): + if len(prepared) >= limit: + break + try: + session_hash = file_sha256(path) + stat = path.stat() + except OSError: + continue + key = f"sha256:{session_hash}" + record = progress["sessions"].get(key) + if record and record.get("status") in {"prepared", "learned", "skipped"} and record.get("size") == stat.st_size: + continue + + focus = _choose_focus(record) + packet = extract_session_packet(path, focus=focus) + if packet.quality < min_quality or not (packet.seed_observations or packet.timeline): + progress["sessions"][key] = { + "path": str(path), + "size": stat.st_size, + "mtime": packet.mtime, + "status": "skipped", + "quality": packet.quality, + "reason": "low_quality_or_no_lessons", + "learn_count": (record or {}).get("learn_count", 0), + "focus_done": (record or {}).get("focus_done", []), + "updated_at": _utc_now(), + } + continue + + packet_path = state.queue_dir / f"{packet.session_hash[:16]}-{packet.focus}.json" + with open(packet_path, "w", encoding="utf-8") as f: + json.dump(packet.to_dict(), f, ensure_ascii=False, indent=2) + with open(packet_path.with_suffix(".md"), "w", encoding="utf-8") as f: + f.write(packet.to_markdown()) + + progress["sessions"][key] = { + "path": str(path), + "size": stat.st_size, + "mtime": packet.mtime, + "status": "prepared", + "quality": packet.quality, + "learn_count": (record or {}).get("learn_count", 0), + "focus_done": (record or {}).get("focus_done", []), + "packet": str(packet_path), + "updated_at": _utc_now(), + } + prepared.append(packet) + + state.save_progress(progress) + return prepared + + +def _load_lessons(path: Path) -> dict[str, dict]: + lessons = {} + if not path.exists(): + return lessons + with open(path, "r", encoding="utf-8", errors="replace") as f: + for line in f: + line = line.strip() + if not line: + continue + try: + item = json.loads(line) + except json.JSONDecodeError: + continue + lesson_id = item.get("id") + if lesson_id: + lessons[lesson_id] = item + return lessons + + +def _write_lessons(path: Path, lessons: dict[str, dict]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + with open(path, "w", encoding="utf-8", newline="\n") as f: + for item in sorted(lessons.values(), key=lambda x: (-int(x.get("evidence_count", 0)), x.get("id", ""))): + f.write(json.dumps(item, ensure_ascii=False, sort_keys=True) + "\n") + + +def learn_from_packets(state: DistillState | None = None, limit: int = 5) -> int: + state = state or DistillState() + state.ensure() + progress = state.load_progress() + lessons = _load_lessons(state.lessons_path) + learned = 0 + + for packet_path in sorted(state.queue_dir.glob("*.json")): + if learned >= limit: + break + try: + with open(packet_path, "r", encoding="utf-8") as f: + packet = json.load(f) + except (OSError, json.JSONDecodeError): + continue + + packet_lessons = packet.get("lessons") or [] + if not packet_lessons: + continue + + source_hash = packet.get("session_hash", "") + source_key = f"sha256:{source_hash}" if source_hash else "" + for lesson in packet_lessons: + lesson_id = lesson.get("id") + if not lesson_id: + continue + current = lessons.get(lesson_id) + if current is None: + current = { + "id": lesson_id, + "category": lesson.get("category", "workflow"), + "title": lesson.get("title", lesson_id), + "guidance": lesson.get("guidance", ""), + "signals": sorted(set(lesson.get("signals", []))), + "evidence_count": 0, + "sources": [], + "quality_max": 0.0, + "created_at": _utc_now(), + } + lessons[lesson_id] = current + current["evidence_count"] = int(current.get("evidence_count", 0)) + 1 + current["quality_max"] = max(float(current.get("quality_max", 0.0)), float(packet.get("quality", 0.0))) + current["signals"] = sorted(set(current.get("signals", [])) | set(lesson.get("signals", []))) + if source_hash and source_hash[:16] not in current["sources"]: + current["sources"].append(source_hash[:16]) + current["updated_at"] = _utc_now() + + record = progress["sessions"].get(source_key) + if record is not None: + record["status"] = "learned" + record["learn_count"] = int(record.get("learn_count", 0)) + 1 + focus_done = list(record.get("focus_done") or []) + focus = packet.get("focus", "workflow") + if focus not in focus_done: + focus_done.append(focus) + record["focus_done"] = focus_done + record["updated_at"] = _utc_now() + + learned += 1 + target = state.learned_dir / packet_path.name + shutil.move(str(packet_path), str(target)) + md_path = packet_path.with_suffix(".md") + if md_path.exists(): + shutil.move(str(md_path), str(target.with_suffix(".md"))) + + _write_lessons(state.lessons_path, lessons) + state.save_progress(progress) + return learned + + +def promote_candidates( + state: DistillState | None = None, + min_evidence: int = 2, + min_confidence: float = 0.85, +) -> int: + """Promote validated candidate lessons into formal lessons.""" + state = state or DistillState() + state.ensure() + candidates = _load_jsonl_records(state.candidates_path) + lessons = _load_lessons(state.lessons_path) + promoted = 0 + for candidate in candidates: + evidence_count = _candidate_evidence_count(candidate) + if evidence_count < min_evidence and float(candidate.get("confidence_max", 0.0)) < min_confidence: + continue + lesson_id = candidate.get("id") + if not lesson_id: + continue + current = lessons.get(lesson_id) + if current is None: + current = { + "id": lesson_id, + "category": candidate.get("category", "workflow"), + "title": candidate.get("title", lesson_id), + "guidance": candidate.get("guidance", ""), + "signals": sorted(set(candidate.get("evidence", []))), + "evidence_count": 0, + "sources": [], + "quality_max": float(candidate.get("confidence_max", 0.0)), + "created_at": _utc_now(), + } + lessons[lesson_id] = current + promoted += 1 + current["evidence_count"] = max(int(current.get("evidence_count", 0)), evidence_count) + current["quality_max"] = max(float(current.get("quality_max", 0.0)), float(candidate.get("confidence_max", 0.0))) + current["signals"] = sorted(set(current.get("signals", [])) | set(candidate.get("evidence", []))) + current["sources"] = sorted(set(current.get("sources", [])) | set(candidate.get("sources", []))) + current["updated_at"] = _utc_now() + _write_lessons(state.lessons_path, lessons) + return promoted + + +def render_sop(state: DistillState | None = None, output_path: Path | str = DEFAULT_SOP_PATH) -> str: + state = state or DistillState() + lessons = _load_lessons(state.lessons_path) + output_path = Path(output_path) + grouped: dict[str, list[dict]] = {} + for item in lessons.values(): + grouped.setdefault(item.get("category", "workflow"), []).append(item) + + lines = [ + "# Codex Coding SOP", + "", + "来源:由 `codex_session_distill.py` 从本机 Codex JSONL 会话生成脱敏证据包,再由 LLM 读取 packet 并通过 `codex_lesson_update` 提议、校验、晋升。本文只保留跨项目编码工作法,不保存原始对话、密钥、私有路径或一次性业务事实。", + "", + "## 使用原则", + "- 先按当前仓库规范执行;本 SOP 只提供编码协作习惯和避坑策略。", + "- 经验应来自 LLM 对脱敏 packet 的分析;规则命中的 seed observation 只作为证据提示,不能单独成为正式经验。", + "- 经验有独立证据计数,证据越多优先级越高;单次会话经验只作为弱提示。", + "- 若本 SOP 与项目 AGENTS/CONTRIBUTING/用户指令冲突,以上游明确指令为准。", + "", + ] + if not grouped: + lines += ["## 当前状态", "- 尚未学习到足够高质量的 Codex 会话经验。", ""] + for category in sorted(grouped): + lines.append(f"## {category}") + rows = sorted(grouped[category], key=lambda x: (-int(x.get("evidence_count", 0)), x.get("title", ""))) + for item in rows: + signals = ", ".join(item.get("signals", [])) + lines += [ + f"### {redact_text(item.get('title'), 160)}", + f"- 做法:{redact_text(item.get('guidance'), 320)}", + f"- 证据: {int(item.get('evidence_count', 0))} | signals: {signals or '(none)'}", + "", + ] + text = "\n".join(lines).rstrip() + "\n" + output_path.parent.mkdir(parents=True, exist_ok=True) + with open(output_path, "w", encoding="utf-8", newline="\n") as f: + f.write(text) + return text + + +def status_text(state: DistillState | None = None) -> str: + state = state or DistillState() + progress = state.load_progress() + counts: dict[str, int] = {} + for record in progress.get("sessions", {}).values(): + status = record.get("status", "unknown") + counts[status] = counts.get(status, 0) + 1 + lessons = _load_lessons(state.lessons_path) + candidates = _load_jsonl_records(state.candidates_path) + queued = len(list(state.queue_dir.glob("*.json"))) if state.queue_dir.exists() else 0 + return ( + f"sessions={sum(counts.values())} " + f"prepared={counts.get('prepared', 0)} learned={counts.get('learned', 0)} " + f"skipped={counts.get('skipped', 0)} queued={queued} candidates={len(candidates)} lessons={len(lessons)}" + ) + + +def deep_distill_instructions(packets: list[SessionPacket], state: DistillState | None = None) -> str: + state = state or DistillState() + lines = [ + "# Codex LLM Deep Distill Batch", + "", + "LLM is the distillation core. The parser only prepared redacted evidence packets and deterministic seed observations.", + "For each queued packet markdown below, read the packet and call `codex_lesson_update` for 0-3 reusable lessons supported by the evidence.", + "Do not write raw Codex logs, secrets, private paths, or project-specific facts into memory.", + "", + "## Queued Packets", + ] + if packets: + for packet in packets: + md_path = state.queue_dir / f"{packet.session_hash[:16]}-{packet.focus}.md" + lines.append(f"- {md_path}") + else: + lines.append("- No new packets prepared. Check `status` for queued packets from earlier runs.") + if state.queue_dir.exists(): + for md_path in sorted(state.queue_dir.glob("*.md"))[:20]: + lines.append(f"- {md_path}") + lines += [ + "", + "## After LLM Review", + "Run `python memory/codex_session_distill.py promote` to promote validated candidates and render `memory/codex_coding_sop.md`.", + ] + return "\n".join(lines) + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="Distill reusable coding lessons from Codex JSONL sessions.") + parser.add_argument("--state-dir", default=str(DEFAULT_STATE_DIR), help="Progress and lesson state directory.") + sub = parser.add_subparsers(dest="cmd", required=True) + + scan = sub.add_parser("scan", help="Count candidate Codex JSONL files.") + scan.add_argument("roots", nargs="*", default=[DEFAULT_ROOT_SENTINEL], help="Files or directories to scan. Defaults to auto-discovered .codex/sessions.") + + prepare = sub.add_parser("prepare", help="Parse sessions and enqueue redacted lesson packets.") + prepare.add_argument("roots", nargs="*", default=[DEFAULT_ROOT_SENTINEL], help="Files or directories to scan. Defaults to auto-discovered .codex/sessions.") + prepare.add_argument("--limit", type=int, default=3) + prepare.add_argument("--min-quality", type=float, default=0.55) + + deep = sub.add_parser("deep", help="Prepare redacted packets and print LLM review instructions.") + deep.add_argument("roots", nargs="*", default=[DEFAULT_ROOT_SENTINEL], help="Files or directories to scan. Defaults to auto-discovered .codex/sessions.") + deep.add_argument("--limit", type=int, default=3) + deep.add_argument("--min-quality", type=float, default=0.55) + + learn = sub.add_parser("learn", help="Merge queued packets that already contain LLM-approved lessons into lessons.jsonl.") + learn.add_argument("--limit", type=int, default=5) + + candidate = sub.add_parser("candidate", help="Record one LLM-proposed candidate lesson.") + candidate.add_argument("--title", required=True) + candidate.add_argument("--guidance", required=True) + candidate.add_argument("--category", default="workflow") + candidate.add_argument("--evidence", action="append", default=[]) + candidate.add_argument("--source-hash", default="") + candidate.add_argument("--confidence", type=float, default=0.5) + + promote = sub.add_parser("promote", help="Promote strong candidates into formal lessons.") + promote.add_argument("--min-evidence", type=int, default=2) + promote.add_argument("--min-confidence", type=float, default=0.85) + promote.add_argument("--output", default=str(DEFAULT_SOP_PATH), help="Rendered SOP output path.") + + render = sub.add_parser("render", help="Render lessons.jsonl to codex_coding_sop.md.") + render.add_argument("--output", default=str(DEFAULT_SOP_PATH)) + + run = sub.add_parser("run", help="Legacy safe batch: prepare packets, merge any existing LLM-approved packet lessons, and render.") + run.add_argument("roots", nargs="*", default=[DEFAULT_ROOT_SENTINEL], help="Files or directories to scan. Defaults to auto-discovered .codex/sessions.") + run.add_argument("--limit", type=int, default=3) + run.add_argument("--min-quality", type=float, default=0.55) + run.add_argument("--output", default=str(DEFAULT_SOP_PATH)) + + sub.add_parser("status", help="Show progress summary.") + return parser + + +def main(argv: list[str] | None = None) -> int: + parser = build_parser() + args = parser.parse_args(argv) + state = DistillState(args.state_dir) + + if args.cmd == "scan": + roots = _resolve_roots(args.roots) + files = list(iter_session_files(roots)) + print(f"roots={';'.join(str(r) for r in roots)} found={len(files)}") + return 0 + if args.cmd == "prepare": + packets = prepare_sessions(args.roots, state=state, limit=args.limit, min_quality=args.min_quality) + print(f"prepared={len(packets)} {status_text(state)}") + return 0 + if args.cmd == "deep": + packets = prepare_sessions(args.roots, state=state, limit=args.limit, min_quality=args.min_quality) + print(deep_distill_instructions(packets, state)) + print(f"\nprepared={len(packets)} {status_text(state)}") + return 0 + if args.cmd == "learn": + learned = learn_from_packets(state=state, limit=args.limit) + print(f"learned={learned} {status_text(state)}") + return 0 + if args.cmd == "candidate": + result = codex_lesson_update( + state, + title=args.title, + guidance=args.guidance, + category=args.category, + evidence=args.evidence, + source_hash=args.source_hash, + confidence=args.confidence, + ) + print(json.dumps(result, ensure_ascii=False)) + return 0 if result.get("status") != "rejected" else 1 + if args.cmd == "promote": + promoted = promote_candidates(state, min_evidence=args.min_evidence, min_confidence=args.min_confidence) + text = render_sop(state=state, output_path=args.output) + print(f"promoted={promoted} rendered={args.output} bytes={len(text.encode('utf-8'))} {status_text(state)}") + return 0 + if args.cmd == "render": + text = render_sop(state=state, output_path=args.output) + print(f"rendered={args.output} bytes={len(text.encode('utf-8'))}") + return 0 + if args.cmd == "run": + packets = prepare_sessions(args.roots, state=state, limit=args.limit, min_quality=args.min_quality) + learned = learn_from_packets(state=state, limit=args.limit) + text = render_sop(state=state, output_path=args.output) + print( + f"prepared={len(packets)} learned={learned} rendered={args.output} bytes={len(text.encode('utf-8'))} " + "note=deep_lessons_require_llm_review" + ) + return 0 + if args.cmd == "status": + print(status_text(state)) + return 0 + parser.print_help() + return 2 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/memory/codex_session_distill_sop.md b/memory/codex_session_distill_sop.md new file mode 100644 index 00000000..52a5825d --- /dev/null +++ b/memory/codex_session_distill_sop.md @@ -0,0 +1,91 @@ +# Codex Session Distill SOP + +**触发**:用户要求“学习 Codex 历史 / 蒸馏 Codex 会话 / 提升编码经验”,或自主行动需要低风险学习任务。 +**目标**:从本机 Codex JSONL 会话中提炼跨项目编码工作法,更新 `codex_coding_sop.md`。不保存原始对话、密钥、私有路径或一次性业务事实。 + +## 快速流程 + +1. 查看状态: + ```powershell + python ../memory/codex_session_distill.py status + ``` + +2. 小批量准备 LLM 深度蒸馏包: + ```powershell + python ../memory/codex_session_distill.py deep --limit 3 + ``` + 未指定目录时,工具会自动发现本机 Codex sessions 目录,优先检查 `$CODEX_SESSIONS_DIR`、`$CODEX_HOME\sessions`、用户目录下的 `.codex\sessions`、AppData 常见位置,以及当前工作区附近的 `.codex\sessions`。这个命令只生成脱敏证据包和 LLM 审阅说明,不会把规则命中的 seed observation 直接当成正式经验。 + +3. 如需指定目录: + ```powershell + python ../memory/codex_session_distill.py deep "$env:USERPROFILE\.codex\sessions\2026\05" --limit 3 + ``` + +4. 读取 `../memory/codex_distill/queue/*.md`。基于 packet 中的目标、timeline、验证命令、失败恢复和 seed observations 判断是否有可复用经验;有则调用 `codex_lesson_update` 写入候选。 + +5. 晋升候选并渲染 SOP: + ```powershell + python ../memory/codex_session_distill.py promote + ``` + +6. 蒸馏完成后读取: + ```text + ../memory/codex_coding_sop.md + ``` + +## LLM 参与的深度蒸馏 + +当用户要求“深入学习 Codex 经验”或你发现规则蒸馏过于保守时,走工具闭环,不要直接 patch 正式 SOP: + +1. 生成脱敏 packet: + ```powershell + python ../memory/codex_session_distill.py deep --limit 1 + ``` + +2. 读取 `../memory/codex_distill/queue/*.md` 中最新 packet。只基于 packet 内容判断,禁止读取原始 JSONL。packet 中的 `Seed Observations` 是确定性证据提示,不是正式 lesson;必须由 LLM 结合完整轨迹判断是否存在更丰富、可迁移的经验。 + +3. 若发现可复用经验,调用 `codex_lesson_update` 工具写入候选。每次只写一条经验,字段要短: + - `title`: 简短标题 + - `guidance`: 可复用做法,不写项目私有事实 + - `category`: workflow/debugging/testing/git/security/frontend/planning/communication/quality + - `evidence`: packet 中的短证据信号 + - `source_hash`: packet 标题或内容里的 hash 前缀 + - `confidence`: 0.0-1.0 + +4. 晋升候选并渲染 SOP: + ```powershell + python ../memory/codex_session_distill.py promote + ``` + +这个流程中,LLM 是蒸馏核心,负责发现和表达经验;程序只负责脱敏、证据包生成、安全校验、去重、晋升和写文件。不要用简单规则匹配替代 LLM 判断。 + +## 写入边界 + +- 允许写入:`../memory/codex_distill/`、`../memory/codex_coding_sop.md` +- 不要读取或复制密钥文件。 +- 不要把 JSONL 原文写入长期记忆。 +- 不要把项目私有事实、绝对用户路径、token、cookie、临时错误细节写入 SOP。 + +## 进度规则 + +- `codex_distill/progress.json` 用 session hash 记录处理状态。 +- 已 `prepared` / `learned` / `skipped` 且 size 未变的 JSONL 默认跳过。 +- `codex_lesson_update` 的 `evidence_count` 按独立 `source_hash` 计数;同一 packet 反复调用不会刷高晋升证据。 +- 重复出现的好经验只增加 `lessons.jsonl` 中的独立证据计数,不重复扩写 SOP。 +- 扫描不是随机采样:按文件路径稳定排序,取前 N 个未处理且达到质量门槛的 JSONL。传入月份目录时只处理该月份目录;不传目录时处理自动发现到的 sessions 根目录。 + +## 质量门槛 + +优先学习有这些信号的会话: + +- 先搜索/读取仓库,再改文件 +- 有 `apply_patch` 或文件修改 +- 修改后运行测试、lint、类型检查或其他验证 +- 失败后读取错误并切换策略,最终恢复成功 + +跳过: + +- 纯闲聊或短问答 +- 无验证的猜测 +- 只包含一次性路径、业务状态、日志粘贴的会话 +- 脱敏风险高的会话 diff --git a/mykey_template.py b/mykey_template.py index dc681556..b8a99045 100644 --- a/mykey_template.py +++ b/mykey_template.py @@ -65,6 +65,13 @@ # reasoning_effort 合法值: none / minimal / low / medium / high / xhigh # thinking_type 合法值: adaptive / enabled / disabled # +# ══════════════════════════════════════════════════════════════════════════════ + +# ────────── TUI 设置 ────────── +# tuiapp_v2 左侧 RECENT 区显示最近可恢复历史会话的数量。 +tui_recent_sessions_limit = 10 + + # ══════════════════════════════════════════════════════════════════════════════ # 所有字段速查(按 BaseSession.__init__ 顺序) # ─── 鉴权 / 路由 ───────────────────────────────────────────────────────────── @@ -291,6 +298,8 @@ 'apibase': 'https://api.openai.com/v1', # 补齐到 /v1/chat/completions 'model': 'gpt-5.4', # gpt-5/o 系列 'api_mode': 'chat_completions', # 'chat_completions'(默认)|'responses' + # 'native_tools': False, # OAI 兼容后端不支持 tools 字段时关闭,退回文本工具协议 + # 'native_image_input': True, # 仅支持原生图片输入的 OpenAI 模型开启;本地图片路径会转图片块 # 'reasoning_effort': 'high', # none|minimal|low|medium|high|xhigh # chat_completions → payload.reasoning_effort # responses → payload.reasoning.effort @@ -312,6 +321,8 @@ # 'apibase': 'https://api.openai.com/v1', # 补齐到 /v1/responses(因为 api_mode=responses) # 'model': 'gpt-5.4', # gpt-5/o 系列 # 'api_mode': 'responses', # 改走 /v1/responses 端点 +# 'native_tools': False, # OAI 兼容后端不支持 tools 字段时关闭 +# 'native_image_input': True, # 仅支持原生图片输入的 OpenAI 模型开启 # 'reasoning_effort': 'high', # none|minimal|low|medium|high|xhigh # # responses 模式下写进 payload.reasoning.effort # 'max_retries': 2, # int 默认 1 diff --git a/mykey_template_en.py b/mykey_template_en.py index ea13575d..d824030b 100644 --- a/mykey_template_en.py +++ b/mykey_template_en.py @@ -23,6 +23,13 @@ # # ══════════════════════════════════════════════════════════════════════════════ +# ── TUI settings ──────────────────────────────────────────────────────────── +# Number of recoverable historical sessions shown in tuiapp_v2's RECENT area. +tui_recent_sessions_limit = 10 + + +# ══════════════════════════════════════════════════════════════════════════════ + # ── 1. NativeClaudeSession — Anthropic direct ──────────────────────────────── # Official Anthropic endpoint. apikey starting with 'sk-ant-' is auto-sent @@ -49,6 +56,8 @@ 'apibase': 'https://api.openai.com/v1', 'model': 'gpt-5.4', # or 'o4', 'gpt-5.3-codex', etc. 'api_mode': 'chat_completions', # or 'responses' for /v1/responses + # 'native_tools': False, # disable if an OpenAI-compatible provider rejects the tools field + # 'native_image_input': True, # enable only for OpenAI-native models that support image input # 'reasoning_effort': 'high', # none|minimal|low|medium|high|xhigh # 'max_retries': 3, # 'read_timeout': 120, diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 00000000..3965732d --- /dev/null +++ b/requirements.txt @@ -0,0 +1,12 @@ +# GenericAgent dependency reference generated from the local environment. +# Local Python version: 3.12.4 +# This file pins the README-mentioned packages for comparison/installation. + +requests==2.31.0 + +# Desktop GUI (launch.pyw) +streamlit==1.48.0 +pywebview==6.2.1 + +# Terminal UI (frontends/tuiapp.py) +textual==7.5.0 diff --git a/tests/test_agentmain_llm_sessions.py b/tests/test_agentmain_llm_sessions.py new file mode 100644 index 00000000..82fe8080 --- /dev/null +++ b/tests/test_agentmain_llm_sessions.py @@ -0,0 +1,40 @@ +import sys +import unittest +from pathlib import Path +from unittest.mock import patch + + +REPO_ROOT = Path(__file__).resolve().parents[1] +if str(REPO_ROOT) not in sys.path: + sys.path.insert(0, str(REPO_ROOT)) + + +from agentmain import GenericAgent # noqa: E402 + + +class AgentMainLLMSessionsTest(unittest.TestCase): + def test_failed_mixin_config_is_not_left_as_default_client(self): + backend = type("Backend", (), {"history": [], "name": "native", "model": "gpt-test"})() + client = type("Client", (), {"backend": backend, "last_tools": ""})() + mykeys = { + "mixin_config": {"llm_nos": ["missing"]}, + "native_oai_config": {"name": "native"}, + } + agent = GenericAgent.__new__(GenericAgent) + agent.llm_no = 0 + globals_ref = GenericAgent.load_llm_sessions.__globals__ + + with patch.dict(globals_ref, { + "reload_mykeys": lambda: (mykeys, True), + "resolve_client": lambda _cfg_name: client, + "MixinSession": type("FailingMixinSession", (), {"__init__": lambda self, *_args, **_kwargs: (_ for _ in ()).throw(Exception("missing mixin"))}), + }): + agent.load_llm_sessions() + + self.assertEqual(agent.llmclients, [client]) + self.assertIs(agent.llmclient, client) + self.assertFalse(isinstance(agent.llmclient, dict)) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_codex_lesson_update_tool.py b/tests/test_codex_lesson_update_tool.py new file mode 100644 index 00000000..83cc8d21 --- /dev/null +++ b/tests/test_codex_lesson_update_tool.py @@ -0,0 +1,71 @@ +import sys +import json +import tempfile +import unittest +from pathlib import Path + + +REPO_ROOT = Path(__file__).resolve().parents[1] +if str(REPO_ROOT) not in sys.path: + sys.path.insert(0, str(REPO_ROOT)) + + +from ga import GenericAgentHandler # noqa: E402 + + +def _exhaust(gen): + try: + while True: + next(gen) + except StopIteration as e: + return e.value + + +class CodexLessonUpdateToolTest(unittest.TestCase): + def test_tool_schema_exposes_state_dir_for_isolated_e2e_runs(self): + schema_path = REPO_ROOT / "assets" / "tools_schema.json" + tools = json.loads(schema_path.read_text(encoding="utf-8")) + lesson_tool = next(item for item in tools if item["function"]["name"] == "codex_lesson_update") + properties = lesson_tool["function"]["parameters"]["properties"] + + self.assertIn("state_dir", properties) + self.assertIn("isolated", properties["state_dir"]["description"].lower()) + + def test_handler_writes_candidate_lesson_through_tool(self): + with tempfile.TemporaryDirectory() as tmp: + parent = type("Parent", (), {"task_dir": None, "verbose": False})() + handler = GenericAgentHandler(parent, last_history=[], cwd=tmp) + + outcome = _exhaust(handler.do_codex_lesson_update({ + "title": "保护用户未提交改动", + "guidance": "改代码前检查 git status,遇到无关改动只绕开,不重置或覆盖。", + "category": "git", + "evidence": ["git_status", "patch"], + "source_hash": "abc123", + "confidence": 0.9, + "state_dir": str(Path(tmp) / "state"), + }, type("Response", (), {"content": ""})())) + + self.assertEqual(outcome.data["status"], "candidate_recorded") + self.assertTrue((Path(tmp) / "state" / "candidate_lessons.jsonl").exists()) + + def test_handler_rejects_sensitive_candidate(self): + with tempfile.TemporaryDirectory() as tmp: + parent = type("Parent", (), {"task_dir": None, "verbose": False})() + handler = GenericAgentHandler(parent, last_history=[], cwd=tmp) + + outcome = _exhaust(handler.do_codex_lesson_update({ + "title": "坏经验", + "guidance": r"读取 C:\Users\Administrator\.env 里的 password=abc", + "category": "security", + "evidence": ["secret"], + "source_hash": "abc124", + "confidence": 0.9, + "state_dir": str(Path(tmp) / "state"), + }, type("Response", (), {"content": ""})())) + + self.assertEqual(outcome.data["status"], "rejected") + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_codex_session_distill.py b/tests/test_codex_session_distill.py new file mode 100644 index 00000000..b6adfe71 --- /dev/null +++ b/tests/test_codex_session_distill.py @@ -0,0 +1,333 @@ +import json +import sys +import tempfile +import unittest +from pathlib import Path + + +REPO_ROOT = Path(__file__).resolve().parents[1] +MEMORY_DIR = REPO_ROOT / "memory" +if str(MEMORY_DIR) not in sys.path: + sys.path.insert(0, str(MEMORY_DIR)) + + +from codex_session_distill import ( # noqa: E402 + build_parser, + DistillState, + codex_lesson_update, + deep_distill_instructions, + discover_codex_session_roots, + extract_session_packet, + learn_from_packets, + promote_candidates, + prepare_sessions, + render_sop, +) + + +def _write_jsonl(path, rows): + with open(path, "w", encoding="utf-8") as f: + for row in rows: + f.write(json.dumps(row, ensure_ascii=False) + "\n") + + +def _function_call(name, arguments=None, call_id=None): + return { + "timestamp": "2026-05-09T12:00:00Z", + "type": "response_item", + "payload": { + "type": "function_call", + "name": name, + "arguments": json.dumps(arguments or {}, ensure_ascii=False), + "call_id": call_id or f"call_{name}", + }, + } + + +def _function_output(call_id, output): + return { + "timestamp": "2026-05-09T12:00:01Z", + "type": "response_item", + "payload": { + "type": "function_call_output", + "call_id": call_id, + "output": output, + }, + } + + +class CodexSessionDistillTest(unittest.TestCase): + def test_discover_codex_session_roots_checks_home_appdata_and_workspace_candidates(self): + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + home = tmp_path / "home" + appdata = tmp_path / "appdata" + workspace = tmp_path / "workspace" + first = home / ".codex" / "sessions" + second = appdata / "Codex" / "sessions" + third = workspace / ".codex" / "sessions" + for path in (first, second, third): + path.mkdir(parents=True) + + roots = discover_codex_session_roots(home=home, appdata=appdata, cwd=workspace, env={}) + + self.assertEqual(roots, [first.resolve(), second.resolve(), third.resolve()]) + + def test_extract_session_packet_redacts_sensitive_text_and_builds_llm_distill_packet(self): + with tempfile.TemporaryDirectory() as tmp: + session = Path(tmp) / "rollout.jsonl" + _write_jsonl( + session, + [ + { + "timestamp": "2026-05-09T12:00:00Z", + "type": "session_meta", + "payload": { + "cwd": r"C:\Users\Administrator\secret_project", + "id": "s1", + }, + }, + { + "timestamp": "2026-05-09T12:00:00Z", + "type": "event_msg", + "payload": {"type": "user_message", "message": "fix bug with sk-test-secret-1234567890"}, + }, + _function_call("shell_command", {"command": "rg -n \"broken\" .", "workdir": r"C:\Users\Administrator\secret_project"}, "call_search"), + _function_output("call_search", "Exit code: 0\nbroken found"), + _function_call("apply_patch", {"patch": "*** Begin Patch\n*** Update File: app.py\n*** End Patch"}), + _function_call("shell_command", {"command": "python -m unittest discover -s tests"}, "call_test"), + _function_output("call_test", "Exit code: 0\nOK"), + { + "timestamp": "2026-05-09T12:00:02Z", + "type": "response_item", + "payload": { + "type": "message", + "role": "assistant", + "content": [{"type": "output_text", "text": "done"}], + }, + }, + ], + ) + + packet = extract_session_packet(session) + markdown = packet.to_markdown() + + seed_ids = {lesson["id"] for lesson in packet.seed_observations} + self.assertIn("repo_probe_before_edit", seed_ids) + self.assertIn("verify_changes_before_done", seed_ids) + self.assertIn("prefer_fast_text_search", seed_ids) + self.assertEqual(packet.lessons, []) + self.assertGreaterEqual(len(packet.timeline), 3) + self.assertIn("python -m unittest discover -s tests", packet.verification_commands) + self.assertIn("LLM Distillation Task", markdown) + self.assertIn("codex_lesson_update", packet.llm_distill_prompt) + self.assertGreaterEqual(packet.quality, 0.6) + self.assertNotIn("sk-test-secret", markdown) + self.assertNotIn(r"C:\Users\Administrator", markdown) + self.assertIn("", markdown) + self.assertIn("", markdown) + + def test_prepare_sessions_records_progress_and_skips_processed_files(self): + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + sessions = tmp_path / "sessions" + sessions.mkdir() + state = DistillState(tmp_path / "state") + session = sessions / "rollout.jsonl" + _write_jsonl( + session, + [ + {"timestamp": "2026-05-09T12:00:00Z", "type": "session_meta", "payload": {"cwd": "repo"}}, + _function_call("shell_command", {"command": "rg -n target ."}), + _function_call("apply_patch", {"patch": "*** Begin Patch\n*** End Patch"}), + _function_call("shell_command", {"command": "python -m unittest discover -s tests"}), + _function_output("call_shell_command", "Exit code: 0\nOK"), + ], + ) + + first = prepare_sessions([sessions], state, limit=10, min_quality=0.1) + second = prepare_sessions([sessions], state, limit=10, min_quality=0.1) + + self.assertEqual(len(first), 1) + self.assertEqual(second, []) + progress = state.load_progress() + only_record = next(iter(progress["sessions"].values())) + self.assertEqual(only_record["status"], "prepared") + self.assertEqual(only_record["learn_count"], 0) + instructions = deep_distill_instructions(first, state) + self.assertIn("LLM is the distillation core", instructions) + self.assertIn("codex_lesson_update", instructions) + self.assertIn(".md", instructions) + + def test_prepare_sessions_processes_files_in_stable_path_order_not_random_order(self): + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + sessions = tmp_path / "sessions" + (sessions / "2026" / "05").mkdir(parents=True) + (sessions / "2026" / "04").mkdir(parents=True) + for rel in ("2026/05/b.jsonl", "2026/04/a.jsonl"): + _write_jsonl( + sessions / rel, + [ + {"timestamp": "2026-05-09T12:00:00Z", "type": "session_meta", "payload": {"cwd": "repo"}}, + _function_call("shell_command", {"command": "rg -n target ."}), + _function_call("apply_patch", {"patch": "*** Begin Patch\n*** End Patch"}), + _function_call("shell_command", {"command": "python -m unittest discover -s tests"}), + _function_output("call_shell_command", "Exit code: 0\nOK"), + ], + ) + + packets = prepare_sessions([sessions], DistillState(tmp_path / "state"), limit=1, min_quality=0.1) + + self.assertEqual(len(packets), 1) + self.assertTrue(packets[0].source.endswith("2026\\04\\a.jsonl") or packets[0].source.endswith("2026/04/a.jsonl")) + + def test_learn_from_packets_merges_repeated_lessons_and_render_sop(self): + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + state = DistillState(tmp_path / "state") + queue = state.queue_dir + queue.mkdir(parents=True, exist_ok=True) + + packet_body = { + "session_hash": "abc123", + "source": "rollout.jsonl", + "quality": 0.9, + "focus": "workflow", + "lessons": [ + { + "id": "repo_probe_before_edit", + "category": "workflow", + "title": "改代码前先探测仓库事实", + "guidance": "编码任务先读项目规范、状态和相关代码,再做最小补丁。", + "signals": ["fast_search", "patch"], + } + ], + } + for index in range(2): + current = dict(packet_body) + current["session_hash"] = f"abc12{index}" + with open(queue / f"packet-{index}.json", "w", encoding="utf-8") as f: + json.dump(current, f, ensure_ascii=False) + + learned = learn_from_packets(state, limit=10) + sop = render_sop(state, tmp_path / "codex_coding_sop.md") + lessons = [json.loads(line) for line in state.lessons_path.read_text(encoding="utf-8").splitlines()] + + self.assertEqual(learned, 2) + self.assertEqual(len(lessons), 1) + self.assertEqual(lessons[0]["evidence_count"], 2) + self.assertIn("改代码前先探测仓库事实", sop) + self.assertIn("证据: 2", sop) + + def test_learn_from_packets_does_not_promote_rule_seed_observations(self): + with tempfile.TemporaryDirectory() as tmp: + tmp_path = Path(tmp) + state = DistillState(tmp_path / "state") + state.queue_dir.mkdir(parents=True, exist_ok=True) + with open(state.queue_dir / "packet.json", "w", encoding="utf-8") as f: + json.dump( + { + "session_hash": "seed123", + "source": "rollout.jsonl", + "quality": 0.9, + "focus": "workflow", + "seed_observations": [ + { + "id": "repo_probe_before_edit", + "category": "workflow", + "title": "Probe repository facts before editing", + "guidance": "Inspect project context before patching.", + "signals": ["repo_probe", "patch"], + } + ], + "lessons": [], + }, + f, + ensure_ascii=False, + ) + + learned = learn_from_packets(state, limit=10) + lessons = state.lessons_path.read_text(encoding="utf-8").splitlines() if state.lessons_path.exists() else [] + + self.assertEqual(learned, 0) + self.assertEqual(lessons, []) + self.assertTrue((state.queue_dir / "packet.json").exists()) + + def test_codex_lesson_update_validates_redacts_and_promotes_candidates(self): + with tempfile.TemporaryDirectory() as tmp: + state = DistillState(Path(tmp) / "state") + + result = codex_lesson_update( + state, + title="保护用户未提交改动", + guidance="改代码前检查 git status,遇到无关改动只绕开,不重置或覆盖。", + category="git", + evidence=["git_status", "patch"], + source_hash="abc123", + confidence=0.9, + ) + + self.assertEqual(result["status"], "candidate_recorded") + self.assertEqual(result["candidate"]["id"], "git_protect_user_worktree") + promoted = promote_candidates(state, min_evidence=1, min_confidence=0.85) + self.assertEqual(promoted, 1) + sop = render_sop(state, Path(tmp) / "sop.md") + self.assertIn("保护用户未提交改动", sop) + + bad = codex_lesson_update( + state, + title="泄露路径", + guidance=r"读取 C:\Users\Administrator\secret\.env 里的 token=abc", + category="security", + evidence=["secret"], + source_hash="abc124", + confidence=0.99, + ) + + self.assertEqual(bad["status"], "rejected") + self.assertIn("sensitive", bad["reason"]) + + def test_candidate_promotion_requires_independent_sources(self): + with tempfile.TemporaryDirectory() as tmp: + state = DistillState(Path(tmp) / "state") + + for _ in range(3): + codex_lesson_update( + state, + title="Keep branch cleanup separate from feature edits", + guidance="When finishing coding work, separate cleanup or branch management from the behavior patch so review can isolate risk.", + category="git", + evidence=["git_status"], + source_hash="same-session-hash", + confidence=0.7, + ) + self.assertEqual(promote_candidates(state, min_evidence=2, min_confidence=0.95), 0) + + codex_lesson_update( + state, + title="Keep branch cleanup separate from feature edits", + guidance="When finishing coding work, separate cleanup or branch management from the behavior patch so review can isolate risk.", + category="git", + evidence=["git_status", "handoff"], + source_hash="different-session-hash", + confidence=0.7, + ) + self.assertEqual(promote_candidates(state, min_evidence=2, min_confidence=0.95), 1) + + def test_promote_cli_accepts_output_for_isolated_rendering(self): + parser = build_parser() + + args = parser.parse_args([ + "--state-dir", + "tmp-state", + "promote", + "--output", + "tmp-sop.md", + ]) + + self.assertEqual(args.output, "tmp-sop.md") + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_native_image_input.py b/tests/test_native_image_input.py new file mode 100644 index 00000000..380d022c --- /dev/null +++ b/tests/test_native_image_input.py @@ -0,0 +1,164 @@ +import base64 +import sys +import tempfile +import unittest +from pathlib import Path + + +REPO_ROOT = Path(__file__).resolve().parents[1] +if str(REPO_ROOT) not in sys.path: + sys.path.insert(0, str(REPO_ROOT)) + + +from agentmain import _build_user_content_with_images, _native_image_input_enabled # noqa: E402 +from agent_loop import agent_runner_loop # noqa: E402 +from llmcore import NativeOAISession, NativeToolClient, _msgs_claude2oai, _to_responses_input # noqa: E402 + + +class NativeImageInputTest(unittest.TestCase): + def test_native_image_input_only_enabled_for_configured_native_oai(self): + backend = NativeOAISession({ + "apikey": "sk-test", + "apibase": "https://example.test/v1", + "model": "gpt-5.5", + "native_image_input": True, + }) + + self.assertTrue(_native_image_input_enabled(type("Client", (), {"backend": backend})())) + + mixin_backend = type("MixinBackend", (), {"primary": backend})() + self.assertTrue(_native_image_input_enabled(type("Client", (), {"backend": mixin_backend})())) + + backend.native_image_input = False + self.assertFalse(_native_image_input_enabled(type("Client", (), {"backend": backend})())) + + self.assertFalse(_native_image_input_enabled(type("Client", (), {"backend": object()})())) + + def test_agentmain_builds_native_image_blocks_from_path(self): + with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as f: + f.write(b"\x89PNG\r\n\x1a\nfake") + path = Path(f.name) + try: + content = _build_user_content_with_images(f'"{path}" 这图内容是?') + finally: + path.unlink(missing_ok=True) + + self.assertEqual(content[0]["type"], "text") + self.assertIn("这图内容是", content[0]["text"]) + self.assertEqual(content[1]["type"], "image") + self.assertEqual(content[1]["source"]["type"], "base64") + self.assertEqual(content[1]["source"]["media_type"], "image/png") + self.assertEqual(base64.b64decode(content[1]["source"]["data"]), b"\x89PNG\r\n\x1a\nfake") + + def test_agentmain_leaves_plain_text_on_original_path(self): + self.assertIsNone(_build_user_content_with_images("你好")) + + def test_openai_converter_collapses_text_only_user_content(self): + chat = _msgs_claude2oai([ + {"role": "user", "content": [{"type": "text", "text": "hi"}]} + ]) + + self.assertEqual(chat[0]["content"], "hi") + + def test_openai_converters_preserve_native_image_blocks(self): + msg = { + "role": "user", + "content": [ + {"type": "text", "text": "描述图片"}, + {"type": "image", "source": {"type": "base64", "media_type": "image/png", "data": "AAAA"}}, + ], + } + + chat = _msgs_claude2oai([msg]) + self.assertEqual(chat[0]["content"][1]["type"], "image_url") + self.assertTrue(chat[0]["content"][1]["image_url"]["url"].startswith("data:image/png;base64,")) + + responses = _to_responses_input(chat) + self.assertEqual(responses[0]["content"][1]["type"], "input_image") + + def test_native_tool_client_keeps_non_text_content_blocks(self): + class Backend: + name = "fake" + history = [] + system = "" + tools = None + + def ask(self, merged): + self.merged = merged + if False: + yield "" + return None + + backend = Backend() + client = NativeToolClient(backend) + list( + client.chat( + messages=[ + {"role": "user", "content": [{"type": "text", "text": "看图"}, {"type": "image_url", "image_url": {"url": "data:image/png;base64,AAAA"}}]} + ] + ) + ) + + self.assertEqual(backend.merged["content"][1]["type"], "image_url") + + def test_native_tool_client_can_disable_native_tools(self): + class Backend: + name = "fake" + history = [] + system = "" + tools = "unset" + native_tools = False + + def ask(self, merged): + self.merged = merged + if False: + yield "" + return None + + backend = Backend() + client = NativeToolClient(backend) + tools = [{"type": "function", "function": {"name": "code_run", "parameters": {"type": "object", "properties": {}}}}] + list(client.chat(messages=[{"role": "system", "content": "sys"}, {"role": "user", "content": "hi"}], tools=tools)) + + self.assertIsNone(backend.tools) + self.assertIn("Tools (mounted", backend.system) + self.assertEqual(backend.merged["content"][0]["text"], "hi") + + def test_agent_runner_uses_initial_multimodal_content(self): + class Response: + content = "ok" + tool_calls = [] + + class Client: + last_tools = "" + + def chat(self, messages, tools=None): + self.messages = messages + if False: + yield "" + return Response() + + class Handler: + max_turns = 1 + _done_hooks = [] + parent = type("Parent", (), {"task_dir": None})() + + def dispatch(self, *args, **kwargs): + if False: + yield None + from agent_loop import StepOutcome + + return StepOutcome("done", next_prompt=None) + + def turn_end_callback(self, *args, **kwargs): + return "" + + client = Client() + content = [{"type": "text", "text": "看图"}, {"type": "image_url", "image_url": {"url": "data:image/png;base64,AAAA"}}] + list(agent_runner_loop(client, "sys", "fallback", Handler(), [], max_turns=1, verbose=False, initial_user_content=content)) + + self.assertEqual(client.messages[1]["content"], content) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_tui_input_history.py b/tests/test_tui_input_history.py new file mode 100644 index 00000000..adef14c0 --- /dev/null +++ b/tests/test_tui_input_history.py @@ -0,0 +1,54 @@ +import sys +import unittest +from pathlib import Path + + +REPO_ROOT = Path(__file__).resolve().parents[1] +FRONTENDS = REPO_ROOT / "frontends" +for path in (str(REPO_ROOT), str(FRONTENDS)): + if path not in sys.path: + sys.path.insert(0, path) + +for name in ("agentmain", "chatapp_common", "continue_cmd", "llmcore"): + module = sys.modules.get(name) + if module is not None and not getattr(module, "__file__", None): + sys.modules.pop(name) + +from tuiapp import PromptInput # noqa: E402 +from tuiapp_v2 import InputArea # noqa: E402 + + +class InputHistoryTest(unittest.TestCase): + def assert_history_navigation(self, input_cls): + inp = input_cls() + inp.add_history("first") + inp.add_history("second") + + inp.text = "draft" + self.assertTrue(inp.show_previous_history()) + self.assertEqual(inp.text, "second") + self.assertTrue(inp.show_previous_history()) + self.assertEqual(inp.text, "first") + self.assertTrue(inp.show_previous_history()) + self.assertEqual(inp.text, "first") + + self.assertTrue(inp.show_next_history()) + self.assertEqual(inp.text, "second") + self.assertTrue(inp.show_next_history()) + self.assertEqual(inp.text, "draft") + self.assertFalse(inp.show_next_history()) + + def test_prompt_input_history_navigation_restores_draft(self): + self.assert_history_navigation(PromptInput) + + def test_input_area_history_navigation_restores_draft(self): + self.assert_history_navigation(InputArea) + + def test_input_history_ignores_blank_and_consecutive_duplicates(self): + inp = InputArea() + inp.add_history("") + inp.add_history("same") + inp.add_history("same") + inp.add_history("next") + + self.assertEqual(inp._history, ["same", "next"]) diff --git a/tests/test_tui_recent_sessions.py b/tests/test_tui_recent_sessions.py new file mode 100644 index 00000000..1febe156 --- /dev/null +++ b/tests/test_tui_recent_sessions.py @@ -0,0 +1,72 @@ +import sys +import unittest +from pathlib import Path + + +REPO_ROOT = Path(__file__).resolve().parents[1] +FRONTENDS = REPO_ROOT / "frontends" +for path in (str(REPO_ROOT), str(FRONTENDS)): + if path not in sys.path: + sys.path.insert(0, path) + +for name in ("agentmain", "chatapp_common", "continue_cmd", "llmcore"): + module = sys.modules.get(name) + if module is not None and not getattr(module, "__file__", None): + sys.modules.pop(name) + + +from tuiapp_v2 import ( # noqa: E402 + AgentSession, + _clamp_sidebar_width, + _is_sidebar_resizer_hit, + _recent_sidebar_sessions, + _recent_preview_width, + _session_rows, + _tui_recent_sessions_limit, +) + + +class TUIRecentSessionsTest(unittest.TestCase): + def test_recent_limit_defaults_and_clamps_invalid_values(self): + self.assertEqual(_tui_recent_sessions_limit({}), 10) + self.assertEqual(_tui_recent_sessions_limit({"tui_recent_sessions_limit": "3"}), 3) + self.assertEqual(_tui_recent_sessions_limit({"tui_recent_sessions_limit": 0}), 10) + self.assertEqual(_tui_recent_sessions_limit({"tui_recent_sessions_limit": "bad"}), 10) + + def test_recent_sidebar_sessions_respects_limit(self): + sessions = [(f"path-{i}", 100 - i, f"preview-{i}", i + 1) for i in range(12)] + + recent = _recent_sidebar_sessions(sessions, 10) + + self.assertEqual(len(recent), 10) + self.assertEqual(recent[0][0], "path-0") + self.assertEqual(recent[-1][0], "path-9") + + def test_session_rows_include_preview_lines(self): + sess = AgentSession(agent_id=1, name="main", agent=object()) + + self.assertEqual(_session_rows(sess), 3) + + sess.messages.append(type("Msg", (), {"role": "user", "content": "hello"})()) + self.assertEqual(_session_rows(sess), 4) + + def test_sidebar_width_is_clamped_to_safe_layout_range(self): + self.assertEqual(_clamp_sidebar_width(10, 120), 24) + self.assertEqual(_clamp_sidebar_width(40, 120), 40) + self.assertEqual(_clamp_sidebar_width(90, 120), 70) + self.assertEqual(_clamp_sidebar_width(50, 80), 40) + + def test_sidebar_resizer_hit_uses_boundary_coordinate(self): + self.assertTrue(_is_sidebar_resizer_hit(33, 34)) + self.assertTrue(_is_sidebar_resizer_hit(34, 34)) + self.assertTrue(_is_sidebar_resizer_hit(35, 34)) + self.assertFalse(_is_sidebar_resizer_hit(36, 34)) + + def test_recent_preview_width_tracks_sidebar_width(self): + self.assertEqual(_recent_preview_width(30), 12) + self.assertEqual(_recent_preview_width(50), 28) + self.assertEqual(_recent_preview_width(70), 48) + + +if __name__ == "__main__": + unittest.main()