From 03baedbcae26c484d27485a4bac3b300adaa80cf Mon Sep 17 00:00:00 2001 From: Codex Date: Mon, 1 Jun 2026 19:24:09 +0800 Subject: [PATCH 1/2] Add standalone Analyzer runner --- .gitignore | 1 + examples/scripts/run_analyzer_standalone.py | 80 ++++++++++++ loopai/agents/Analyzer/__init__.py | 3 +- loopai/agents/Analyzer/readme.md | 124 ++++++++++++++++++ loopai/agents/Analyzer/standalone.py | 138 ++++++++++++++++++++ loopai/agents/__init__.py | 38 +++++- 6 files changed, 378 insertions(+), 6 deletions(-) create mode 100644 examples/scripts/run_analyzer_standalone.py create mode 100644 loopai/agents/Analyzer/standalone.py diff --git a/.gitignore b/.gitignore index b0bab0d..a527e34 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ __pycache__/ +.venv/ api_key.txt data/ output/ diff --git a/examples/scripts/run_analyzer_standalone.py b/examples/scripts/run_analyzer_standalone.py new file mode 100644 index 0000000..57a6643 --- /dev/null +++ b/examples/scripts/run_analyzer_standalone.py @@ -0,0 +1,80 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import json +import os +import sys +from typing import Any, Dict + +from omegaconf import OmegaConf + + +_REPO_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..")) +if _REPO_ROOT not in sys.path: + sys.path.insert(0, _REPO_ROOT) + +from loopai.agents.Analyzer import ANALYZER_NODE_NAMES, run_analyzer_standalone + + +def _load_state(config_path: str) -> Dict[str, Any]: + if config_path.endswith(".json"): + with open(config_path, "r", encoding="utf-8") as f: + cfg = json.load(f) + return cfg.get("default_states", cfg) + + cfg = OmegaConf.load(config_path) + state_cfg = cfg.default_states if "default_states" in cfg else cfg + return OmegaConf.to_container(state_cfg, resolve=True) + + +def _parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Run AnalyzerAgent directly with LangGraph checkpoint resume support." + ) + parser.add_argument( + "--config-path", + required=True, + help="Path to a YAML/JSON state config. If it contains default_states, that mapping is used as state.", + ) + parser.add_argument( + "--thread-id", + default="analyzer-default", + help="LangGraph checkpoint thread_id.", + ) + parser.add_argument( + "--resume", + action="store_true", + help="Resume with the same thread_id from the latest checkpoint.", + ) + parser.add_argument( + "--from-node", + choices=ANALYZER_NODE_NAMES, + default=None, + help="Resume from a specific Analyzer node using checkpoint history when available.", + ) + parser.add_argument( + "--print-result", + action="store_true", + help="Print the final state/result as JSON.", + ) + return parser.parse_args() + + +def main() -> None: + args = _parse_args() + state = None if args.resume else _load_state(args.config_path) + + result = run_analyzer_standalone( + state=state, + thread_id=args.thread_id, + resume=args.resume, + from_node=args.from_node, + ) + + if args.print_result: + print(json.dumps(result, ensure_ascii=False, indent=2, default=str)) + + +if __name__ == "__main__": + main() diff --git a/loopai/agents/Analyzer/__init__.py b/loopai/agents/Analyzer/__init__.py index 040db30..b46f59d 100644 --- a/loopai/agents/Analyzer/__init__.py +++ b/loopai/agents/Analyzer/__init__.py @@ -1 +1,2 @@ -from .analyzer_agent import AnalyzerAgent \ No newline at end of file +from .analyzer_agent import AnalyzerAgent +from .standalone import ANALYZER_NODE_NAMES, run_analyzer_standalone diff --git a/loopai/agents/Analyzer/readme.md b/loopai/agents/Analyzer/readme.md index 918e82f..4359fc0 100644 --- a/loopai/agents/Analyzer/readme.md +++ b/loopai/agents/Analyzer/readme.md @@ -580,3 +580,127 @@ Analyzer 与 Judger 配合后,可以形成完整的: ``` 闭环流程。 + +--- + +## 十、独立运行与断点续跑 + +Analyzer 现在支持不经过 Starter 独立运行。独立入口只封装运行控制,不改变 Analyzer 原有业务节点、graph 拓扑、state 字段结构、输入格式或输出格式。 + +### 1. Python runner + +可以直接调用: + +```python +from loopai.agents import run_analyzer_standalone + +result = run_analyzer_standalone( + state={ + "task_id": "analyzer-demo", + "output_dir": "./outputs", + "eval": {}, + "analyzer": { + "analyze_task_type": "code", + "eval_result_path": "./outputs/result.jsonl", + "analyze_model_path": "gpt-4o-mini", + "analyze_base_url": "http://127.0.0.1:8000/v1", + "analyze_api_key": "EMPTY", + "analyze_temperature": 0.0, + "analyze_top_p": 0.95, + "analyze_batch_size": 20, + "analyze_max_concurrency": 5, + "analyze_chunk_size": 50, + "analyze_sampling_top_k": 5, + "output_brief": True, + "output_suggestion": True, + "quick_brief": True, + "quick_brief_limit": 10 + } + }, + thread_id="analyzer-demo-001", +) +``` + +runner 内部仍然使用: + +```python +AnalyzerAgent(checkpointer=checkpointer, store=store) +graph.invoke(...) +``` + +返回值为 Analyzer graph 的最终 state / result。 + +### 2. CLI 入口 + +新增命令行入口: + +```bash +python examples/scripts/run_analyzer_standalone.py \ + --config-path examples/config/starter.yaml \ + --thread-id analyzer-test-001 +``` + +CLI 支持参数: + +- `--config-path`:YAML / JSON 配置路径。如果配置中包含 `default_states`,则使用 `default_states` 作为输入 state。 +- `--thread-id`:LangGraph checkpoint 使用的 thread id。 +- `--resume`:使用相同 `thread_id` 从 checkpoint 恢复。 +- `--from-node`:从指定 Analyzer 节点继续运行。 +- `--print-result`:将最终 state / result 打印为 JSON。 + +### 3. 断点续跑 + +使用相同 `thread_id` 恢复最近 checkpoint: + +```bash +python examples/scripts/run_analyzer_standalone.py \ + --config-path examples/config/starter.yaml \ + --thread-id analyzer-test-001 \ + --resume +``` + +从指定节点继续运行: + +```bash +python examples/scripts/run_analyzer_standalone.py \ + --config-path examples/config/starter.yaml \ + --thread-id analyzer-test-001 \ + --resume \ + --from-node analyze_result +``` + +当前可用 Analyzer 节点名: + +```text +check_required_fields +route_eval +eval_model +metric_recommend +metric_score +analyze_metric_report +analyze_result +draw_conclusion +finish +``` + +`from_node` 会优先使用 LangGraph checkpoint history 中“下一步将执行该节点”的快照继续运行,避免从头重跑已完成节点。如果没有可用历史快照,则使用 `graph.update_state(..., as_node=previous_node)` 作为兼容兜底。 + +### 4. state 兼容性 + +独立运行入口保持现有 state 格式不变,例如: + +```python +{ + "task_id": "...", + "output_dir": "...", + "eval": {...}, + "analyzer": {...} +} +``` + +runner 不重命名字段、不删除字段、不改变节点之间传递的 state 结构。 + +### 5. 注意事项 + +- 当前默认 `loopai.memory.checkpointer` 是内存型 checkpointer,因此跨进程 CLI 续跑需要替换为持久化 checkpointer 后才能保留历史 checkpoint。 +- 如果 Analyzer 在 standalone 模式下遇到原本要跳回 Starter 父图的配置补全流程,runner 会返回对应的 state update,例如 `exception=ConfigerError`、`next_to=config_node` 和 `configer.configer_error`,便于独立调试。 diff --git a/loopai/agents/Analyzer/standalone.py b/loopai/agents/Analyzer/standalone.py new file mode 100644 index 0000000..fd780bf --- /dev/null +++ b/loopai/agents/Analyzer/standalone.py @@ -0,0 +1,138 @@ +from __future__ import annotations + +from typing import Any, Dict, Iterable, Optional + +from langgraph.errors import ParentCommand + +from loopai.memory import checkpointer, store +from .analyzer_agent import AnalyzerAgent + + +ANALYZER_NODE_NAMES = ( + "check_required_fields", + "route_eval", + "eval_model", + "metric_recommend", + "metric_score", + "analyze_metric_report", + "analyze_result", + "draw_conclusion", + "finish", +) + + +_PREVIOUS_NODE_BY_NODE = { + "route_eval": "check_required_fields", + "eval_model": "route_eval", + "metric_recommend": "route_eval", + "metric_score": "metric_recommend", + "analyze_metric_report": "metric_score", + "analyze_result": "eval_model", + "draw_conclusion": "analyze_result", + "finish": "draw_conclusion", +} + + +def _build_config(thread_id: str) -> Dict[str, Dict[str, str]]: + return {"configurable": {"thread_id": thread_id}} + + +def _snapshot_next(snapshot: Any) -> Iterable[str]: + next_nodes = getattr(snapshot, "next", None) + return next_nodes or () + + +def _find_checkpoint_before_node(graph: Any, config: Dict[str, Any], node_name: str) -> Optional[Any]: + get_state_history = getattr(graph, "get_state_history", None) + if get_state_history is None: + return None + + for snapshot in get_state_history(config): + if node_name in _snapshot_next(snapshot): + return snapshot + return None + + +def _latest_state_values(graph: Any, config: Dict[str, Any]) -> Optional[Dict[str, Any]]: + get_state = getattr(graph, "get_state", None) + if get_state is None: + return None + + snapshot = get_state(config) + values = getattr(snapshot, "values", None) + if values is None: + return None + return values + + +def _invoke_graph(graph: Any, input_state: Optional[Dict[str, Any]], config: Dict[str, Any]) -> Dict[str, Any]: + try: + return graph.invoke(input_state, config=config) + except ParentCommand as exc: + command = exc.args[0] if exc.args else None + update = getattr(command, "update", None) + if isinstance(update, dict): + return update + raise + + +def _invoke_from_node( + graph: Any, + state: Optional[Dict[str, Any]], + config: Dict[str, Any], + from_node: str, +) -> Dict[str, Any]: + if from_node not in ANALYZER_NODE_NAMES: + available = ", ".join(ANALYZER_NODE_NAMES) + raise ValueError(f"Unknown Analyzer node: {from_node}. Available nodes: {available}") + + checkpoint = _find_checkpoint_before_node(graph, config, from_node) + if checkpoint is not None: + return _invoke_graph(graph, None, checkpoint.config) + + base_state = state or _latest_state_values(graph, config) + if base_state is None: + raise ValueError( + "No checkpoint state found. Pass state for the first run, or run once with the same thread_id before resuming." + ) + + if from_node == "check_required_fields": + return _invoke_graph(graph, base_state, config) + + update_state = getattr(graph, "update_state", None) + if update_state is None: + raise RuntimeError("This LangGraph version does not expose update_state; cannot resume from a node fallback.") + + # Fallback for threads without checkpoint history: write the provided/latest state + # as if the previous node completed, then let LangGraph schedule from_node. + previous_node = _PREVIOUS_NODE_BY_NODE[from_node] + update_state(config, base_state, as_node=previous_node) + return _invoke_graph(graph, None, config) + + +def run_analyzer_standalone( + state: Optional[Dict[str, Any]], + thread_id: str = "analyzer-default", + resume: bool = False, + from_node: Optional[str] = None, +) -> Dict[str, Any]: + """Run AnalyzerAgent directly with LangGraph checkpoint controls. + + The input state is passed through unchanged. This wrapper only controls + thread_id, checkpoint resume, and optional node-level restart behavior. + """ + + sg = AnalyzerAgent(checkpointer=checkpointer, store=store) + graph = sg() + config = _build_config(thread_id) + + if from_node is not None: + return _invoke_from_node(graph, state, config, from_node) + + if resume: + return _invoke_graph(graph, None, config) + + if state is None: + raise ValueError("state is required when resume is false.") + + return _invoke_graph(graph, state, config) diff --git a/loopai/agents/__init__.py b/loopai/agents/__init__.py index b9e7cdb..f4aed7b 100644 --- a/loopai/agents/__init__.py +++ b/loopai/agents/__init__.py @@ -1,6 +1,34 @@ from .BaseAgent import BaseAgent -from .Starter.starter_agent import StarterAgent -from .Judger.judger_agent import JudgerAgent -from .Analyzer.analyzer_agent import AnalyzerAgent -from .Obtainer.obtainer_agent import ObtainerAgent -from .Trainer.trainer_agent import TrainerAgent + + +__all__ = [ + "BaseAgent", + "StarterAgent", + "JudgerAgent", + "AnalyzerAgent", + "run_analyzer_standalone", + "ObtainerAgent", + "TrainerAgent", +] + + +def __getattr__(name): + if name == "StarterAgent": + from .Starter.starter_agent import StarterAgent + return StarterAgent + if name == "JudgerAgent": + from .Judger.judger_agent import JudgerAgent + return JudgerAgent + if name == "AnalyzerAgent": + from .Analyzer.analyzer_agent import AnalyzerAgent + return AnalyzerAgent + if name == "run_analyzer_standalone": + from .Analyzer.standalone import run_analyzer_standalone + return run_analyzer_standalone + if name == "ObtainerAgent": + from .Obtainer.obtainer_agent import ObtainerAgent + return ObtainerAgent + if name == "TrainerAgent": + from .Trainer.trainer_agent import TrainerAgent + return TrainerAgent + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") From f08f4eea321e5e6a289425fb539cef9b9164e63c Mon Sep 17 00:00:00 2001 From: Justin Date: Tue, 2 Jun 2026 17:37:51 +0800 Subject: [PATCH 2/2] Add Analyzer standalone SQLite resume --- examples/scripts/run_analyzer_standalone.py | 93 +++++++++-- loopai/agents/Analyzer/__init__.py | 2 +- loopai/agents/Analyzer/analyzer_agent.py | 6 +- loopai/agents/Analyzer/readme.md | 10 +- loopai/agents/Analyzer/standalone.py | 166 +++++++++++--------- setup.py | 1 + 6 files changed, 191 insertions(+), 87 deletions(-) diff --git a/examples/scripts/run_analyzer_standalone.py b/examples/scripts/run_analyzer_standalone.py index 57a6643..f8fac08 100644 --- a/examples/scripts/run_analyzer_standalone.py +++ b/examples/scripts/run_analyzer_standalone.py @@ -4,9 +4,11 @@ import argparse import json import os +import sqlite3 import sys from typing import Any, Dict +from langgraph.checkpoint.sqlite import SqliteSaver from omegaconf import OmegaConf @@ -14,7 +16,14 @@ if _REPO_ROOT not in sys.path: sys.path.insert(0, _REPO_ROOT) -from loopai.agents.Analyzer import ANALYZER_NODE_NAMES, run_analyzer_standalone +from loopai.agents.Analyzer import ( + ANALYZER_NODE_NAMES, + get_analyzer_checkpoint_state, + run_analyzer_standalone, +) + + +_DEFAULT_CHECKPOINT_PATH = "outputs/analyzer_checkpoints.sqlite" def _load_state(config_path: str) -> Dict[str, Any]: @@ -28,6 +37,33 @@ def _load_state(config_path: str) -> Dict[str, Any]: return OmegaConf.to_container(state_cfg, resolve=True) +def _redact_key_fields(value: Any) -> Any: + if isinstance(value, dict): + redacted = {} + for key, child in value.items(): + key_name = str(key).lower() + if key_name in {"api_key", "analyze_api_key"} or key_name.endswith("_key"): + redacted[key] = "***REDACTED***" + else: + redacted[key] = _redact_key_fields(child) + return redacted + if isinstance(value, list): + return [_redact_key_fields(item) for item in value] + return value + + +def _build_sqlite_checkpointer(checkpoint_path: str) -> SqliteSaver: + checkpoint_dir = os.path.dirname(checkpoint_path) + if checkpoint_dir: + os.makedirs(checkpoint_dir, exist_ok=True) + conn = sqlite3.connect(checkpoint_path, check_same_thread=False) + checkpointer = SqliteSaver(conn) + setup = getattr(checkpointer, "setup", None) + if setup is not None: + setup() + return checkpointer + + def _parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Run AnalyzerAgent directly with LangGraph checkpoint resume support." @@ -42,6 +78,11 @@ def _parse_args() -> argparse.Namespace: default="analyzer-default", help="LangGraph checkpoint thread_id.", ) + parser.add_argument( + "--checkpoint-path", + default=_DEFAULT_CHECKPOINT_PATH, + help="SQLite checkpoint file path for cross-process resume.", + ) parser.add_argument( "--resume", action="store_true", @@ -49,9 +90,12 @@ def _parse_args() -> argparse.Namespace: ) parser.add_argument( "--from-node", - choices=ANALYZER_NODE_NAMES, default=None, - help="Resume from a specific Analyzer node using checkpoint history when available.", + help=( + "Resume from a specific Analyzer node using checkpoint history when available. " + f"Available nodes: {', '.join(ANALYZER_NODE_NAMES)}. " + "Legacy names like analyze_result_node are also accepted." + ), ) parser.add_argument( "--print-result", @@ -63,17 +107,46 @@ def _parse_args() -> argparse.Namespace: def main() -> None: args = _parse_args() + checkpointer = _build_sqlite_checkpointer(args.checkpoint_path) state = None if args.resume else _load_state(args.config_path) - result = run_analyzer_standalone( - state=state, - thread_id=args.thread_id, - resume=args.resume, - from_node=args.from_node, - ) + if args.resume: + try: + checkpoint_state = get_analyzer_checkpoint_state( + args.thread_id, + checkpointer=checkpointer, + checkpoint_path=args.checkpoint_path, + ) + print( + f"[AnalyzerStandalone] resume checkpoint found: " + f"thread_id={args.thread_id}, " + f"current={checkpoint_state.get('current') or checkpoint_state.get('next_to') or 'unknown'}", + flush=True, + ) + except RuntimeError as exc: + print( + f"[AnalyzerStandalone] {exc}. " + f"Please run once without --resume using thread_id={args.thread_id} " + f"and checkpoint_path={args.checkpoint_path} first.", + flush=True, + ) + raise SystemExit(1) + + try: + result = run_analyzer_standalone( + state=state, + thread_id=args.thread_id, + resume=args.resume, + from_node=args.from_node, + checkpointer=checkpointer, + checkpoint_path=args.checkpoint_path, + ) + except RuntimeError as exc: + print(f"[AnalyzerStandalone] {exc}", flush=True) + raise SystemExit(1) if args.print_result: - print(json.dumps(result, ensure_ascii=False, indent=2, default=str)) + print(json.dumps(_redact_key_fields(result), ensure_ascii=False, indent=2, default=str)) if __name__ == "__main__": diff --git a/loopai/agents/Analyzer/__init__.py b/loopai/agents/Analyzer/__init__.py index b46f59d..c6bf3ed 100644 --- a/loopai/agents/Analyzer/__init__.py +++ b/loopai/agents/Analyzer/__init__.py @@ -1,2 +1,2 @@ from .analyzer_agent import AnalyzerAgent -from .standalone import ANALYZER_NODE_NAMES, run_analyzer_standalone +from .standalone import ANALYZER_NODE_NAMES, get_analyzer_checkpoint_state, run_analyzer_standalone diff --git a/loopai/agents/Analyzer/analyzer_agent.py b/loopai/agents/Analyzer/analyzer_agent.py index 68ad5f4..c47fba3 100644 --- a/loopai/agents/Analyzer/analyzer_agent.py +++ b/loopai/agents/Analyzer/analyzer_agent.py @@ -125,7 +125,7 @@ def finish_node(state: LoopAIState, runtime: Runtime[RuntimeContext]): return state return finish_node - def init_graph(self, **kwargs): + def init_graph(self, entry_point: Optional[str] = None, **kwargs): builder = StateGraph(LoopAIState) builder.add_node("check_required_fields", self.get_check_required_fields_node()) @@ -151,7 +151,7 @@ def init_graph(self, **kwargs): builder.add_edge("metric_score", "analyze_metric_report") builder.add_edge("analyze_metric_report", "finish") - builder.set_entry_point("check_required_fields") + builder.set_entry_point(entry_point or "check_required_fields") builder.set_finish_point("finish") self.graph = builder.compile( @@ -168,4 +168,4 @@ def __call__(self, **kwargs): kwargs: keyword arguments to pass to init_graph """ self.init_graph(**kwargs) - return self.graph \ No newline at end of file + return self.graph diff --git a/loopai/agents/Analyzer/readme.md b/loopai/agents/Analyzer/readme.md index 4359fc0..99b6dcc 100644 --- a/loopai/agents/Analyzer/readme.md +++ b/loopai/agents/Analyzer/readme.md @@ -644,6 +644,7 @@ CLI 支持参数: - `--config-path`:YAML / JSON 配置路径。如果配置中包含 `default_states`,则使用 `default_states` 作为输入 state。 - `--thread-id`:LangGraph checkpoint 使用的 thread id。 +- `--checkpoint-path`:SQLite checkpoint 文件路径,默认 `outputs/analyzer_checkpoints.sqlite`。 - `--resume`:使用相同 `thread_id` 从 checkpoint 恢复。 - `--from-node`:从指定 Analyzer 节点继续运行。 - `--print-result`:将最终 state / result 打印为 JSON。 @@ -656,6 +657,7 @@ CLI 支持参数: python examples/scripts/run_analyzer_standalone.py \ --config-path examples/config/starter.yaml \ --thread-id analyzer-test-001 \ + --checkpoint-path outputs/analyzer_checkpoints.sqlite \ --resume ``` @@ -665,8 +667,9 @@ python examples/scripts/run_analyzer_standalone.py \ python examples/scripts/run_analyzer_standalone.py \ --config-path examples/config/starter.yaml \ --thread-id analyzer-test-001 \ + --checkpoint-path outputs/analyzer_checkpoints.sqlite \ --resume \ - --from-node analyze_result + --from-node analyze_result_node ``` 当前可用 Analyzer 节点名: @@ -683,7 +686,7 @@ draw_conclusion finish ``` -`from_node` 会优先使用 LangGraph checkpoint history 中“下一步将执行该节点”的快照继续运行,避免从头重跑已完成节点。如果没有可用历史快照,则使用 `graph.update_state(..., as_node=previous_node)` 作为兼容兜底。 +`resume` 和 `from_node` 使用同一套恢复入口逻辑:先从 SQLite checkpoint 读取 snapshot,再根据 `snapshot.next`、`state["current"]` 或用户指定的 `--from-node` 决定恢复节点,并临时构建以该节点为 entry point 的 Analyzer graph。这样中断在 `draw_conclusion_node` 时,续跑只会执行 `draw_conclusion_node -> finish_node`,不会重新执行 `eval_model_node` 和 `analyze_result_node`。 ### 4. state 兼容性 @@ -702,5 +705,6 @@ runner 不重命名字段、不删除字段、不改变节点之间传递的 sta ### 5. 注意事项 -- 当前默认 `loopai.memory.checkpointer` 是内存型 checkpointer,因此跨进程 CLI 续跑需要替换为持久化 checkpointer 后才能保留历史 checkpoint。 +- CLI 默认使用 SQLite checkpointer,因此可以跨进程续跑。请保持 `--thread-id` 和 `--checkpoint-path` 一致。 +- `--print-result` 会脱敏 `api_key`、`analyze_api_key` 和所有 `*_key` 字段。 - 如果 Analyzer 在 standalone 模式下遇到原本要跳回 Starter 父图的配置补全流程,runner 会返回对应的 state update,例如 `exception=ConfigerError`、`next_to=config_node` 和 `configer.configer_error`,便于独立调试。 diff --git a/loopai/agents/Analyzer/standalone.py b/loopai/agents/Analyzer/standalone.py index fd780bf..d8c32f1 100644 --- a/loopai/agents/Analyzer/standalone.py +++ b/loopai/agents/Analyzer/standalone.py @@ -4,7 +4,8 @@ from langgraph.errors import ParentCommand -from loopai.memory import checkpointer, store +from loopai.memory import checkpointer as default_checkpointer +from loopai.memory import store as default_store from .analyzer_agent import AnalyzerAgent @@ -21,15 +22,16 @@ ) -_PREVIOUS_NODE_BY_NODE = { - "route_eval": "check_required_fields", - "eval_model": "route_eval", - "metric_recommend": "route_eval", - "metric_score": "metric_recommend", - "analyze_metric_report": "metric_score", - "analyze_result": "eval_model", - "draw_conclusion": "analyze_result", - "finish": "draw_conclusion", +_NODE_ALIASES = { + "check_required_fields_node": "check_required_fields", + "route_eval_node": "route_eval", + "eval_model_node": "eval_model", + "metric_recommend_node": "metric_recommend", + "metric_score_node": "metric_score", + "analyze_metric_report_node": "analyze_metric_report", + "analyze_result_node": "analyze_result", + "draw_conclusion_node": "draw_conclusion", + "finish_node": "finish", } @@ -37,35 +39,80 @@ def _build_config(thread_id: str) -> Dict[str, Dict[str, str]]: return {"configurable": {"thread_id": thread_id}} -def _snapshot_next(snapshot: Any) -> Iterable[str]: - next_nodes = getattr(snapshot, "next", None) - return next_nodes or () +def _state_is_finished(state: Dict[str, Any]) -> bool: + current = _normalize_node_name(str(state.get("current", ""))) + return current == "finish" -def _find_checkpoint_before_node(graph: Any, config: Dict[str, Any], node_name: str) -> Optional[Any]: - get_state_history = getattr(graph, "get_state_history", None) - if get_state_history is None: - return None +def _normalize_node_name(node_name: str) -> str: + node_name = str(node_name or "") + if node_name in ANALYZER_NODE_NAMES: + return node_name + if node_name in _NODE_ALIASES: + return _NODE_ALIASES[node_name] - for snapshot in get_state_history(config): - if node_name in _snapshot_next(snapshot): - return snapshot - return None + for alias, graph_node in _NODE_ALIASES.items(): + if alias in node_name: + return graph_node + for graph_node in sorted(ANALYZER_NODE_NAMES, key=len, reverse=True): + if graph_node in node_name: + return graph_node + return node_name -def _latest_state_values(graph: Any, config: Dict[str, Any]) -> Optional[Dict[str, Any]]: - get_state = getattr(graph, "get_state", None) - if get_state is None: - return None +def _resume_node_from_state(state: Dict[str, Any]) -> str: + resume_node = _normalize_node_name(state.get("current") or state.get("next_to") or "") + if resume_node not in ANALYZER_NODE_NAMES: + available = ", ".join(ANALYZER_NODE_NAMES) + raise RuntimeError( + f"Cannot resume Analyzer from current={state.get('current')!r}, " + f"next_to={state.get('next_to')!r}. Available nodes: {available}" + ) + return resume_node - snapshot = get_state(config) + +def _build_graph(checkpointer: Any, store: Any, entry_point: Optional[str] = None) -> Any: + sg = AnalyzerAgent(checkpointer=checkpointer or default_checkpointer, store=store or default_store) + return sg(entry_point=entry_point) + + +def _checkpoint_values( + graph: Any, + config: Dict[str, Any], + thread_id: str, + checkpoint_path: Optional[str] = None, +) -> Dict[str, Any]: + snapshot = graph.get_state(config) values = getattr(snapshot, "values", None) - if values is None: - return None + if not values: + if checkpoint_path: + raise RuntimeError(f"No checkpoint found for thread_id={thread_id} in checkpoint_path={checkpoint_path}") + raise RuntimeError(f"No checkpoint found for thread_id={thread_id}") + next_nodes = tuple(node for node in _snapshot_next(snapshot) if node in ANALYZER_NODE_NAMES) + if next_nodes: + values = values.copy() + values["current"] = next_nodes[0] + values["next_to"] = next_nodes[0] return values -def _invoke_graph(graph: Any, input_state: Optional[Dict[str, Any]], config: Dict[str, Any]) -> Dict[str, Any]: +def _snapshot_next(snapshot: Any) -> Iterable[str]: + next_nodes = getattr(snapshot, "next", None) + return next_nodes or () + + +def get_analyzer_checkpoint_state( + thread_id: str = "analyzer-default", + checkpointer: Any = None, + checkpoint_path: Optional[str] = None, +) -> Dict[str, Any]: + """Return the latest Analyzer checkpoint values for a thread_id.""" + graph = _build_graph(checkpointer, default_store) + config = _build_config(thread_id) + return _checkpoint_values(graph, config, thread_id, checkpoint_path) + + +def _invoke_graph(graph: Any, input_state: Any, config: Dict[str, Any]) -> Dict[str, Any]: try: return graph.invoke(input_state, config=config) except ParentCommand as exc: @@ -76,45 +123,14 @@ def _invoke_graph(graph: Any, input_state: Optional[Dict[str, Any]], config: Dic raise -def _invoke_from_node( - graph: Any, - state: Optional[Dict[str, Any]], - config: Dict[str, Any], - from_node: str, -) -> Dict[str, Any]: - if from_node not in ANALYZER_NODE_NAMES: - available = ", ".join(ANALYZER_NODE_NAMES) - raise ValueError(f"Unknown Analyzer node: {from_node}. Available nodes: {available}") - - checkpoint = _find_checkpoint_before_node(graph, config, from_node) - if checkpoint is not None: - return _invoke_graph(graph, None, checkpoint.config) - - base_state = state or _latest_state_values(graph, config) - if base_state is None: - raise ValueError( - "No checkpoint state found. Pass state for the first run, or run once with the same thread_id before resuming." - ) - - if from_node == "check_required_fields": - return _invoke_graph(graph, base_state, config) - - update_state = getattr(graph, "update_state", None) - if update_state is None: - raise RuntimeError("This LangGraph version does not expose update_state; cannot resume from a node fallback.") - - # Fallback for threads without checkpoint history: write the provided/latest state - # as if the previous node completed, then let LangGraph schedule from_node. - previous_node = _PREVIOUS_NODE_BY_NODE[from_node] - update_state(config, base_state, as_node=previous_node) - return _invoke_graph(graph, None, config) - - def run_analyzer_standalone( state: Optional[Dict[str, Any]], thread_id: str = "analyzer-default", resume: bool = False, from_node: Optional[str] = None, + checkpointer: Any = None, + store: Any = None, + checkpoint_path: Optional[str] = None, ) -> Dict[str, Any]: """Run AnalyzerAgent directly with LangGraph checkpoint controls. @@ -122,15 +138,25 @@ def run_analyzer_standalone( thread_id, checkpoint resume, and optional node-level restart behavior. """ - sg = AnalyzerAgent(checkpointer=checkpointer, store=store) - graph = sg() + checkpointer = checkpointer or default_checkpointer + store = store or default_store + graph = _build_graph(checkpointer, store) config = _build_config(thread_id) - if from_node is not None: - return _invoke_from_node(graph, state, config, from_node) - - if resume: - return _invoke_graph(graph, None, config) + if resume or from_node is not None: + checkpoint_state = _checkpoint_values(graph, config, thread_id, checkpoint_path) + resume_node = _normalize_node_name(from_node) if from_node is not None else _resume_node_from_state(checkpoint_state) + if resume_node not in ANALYZER_NODE_NAMES: + available = ", ".join(ANALYZER_NODE_NAMES) + raise ValueError(f"Unknown Analyzer node: {resume_node}. Available nodes: {available}") + + checkpoint_state = checkpoint_state.copy() + checkpoint_state["current"] = resume_node + checkpoint_state["next_to"] = resume_node + if _state_is_finished(checkpoint_state): + return checkpoint_state + resume_graph = _build_graph(checkpointer, store, entry_point=resume_node) + return _invoke_graph(resume_graph, checkpoint_state, config) if state is None: raise ValueError("state is required when resume is false.") diff --git a/setup.py b/setup.py index 23bb18d..a497725 100644 --- a/setup.py +++ b/setup.py @@ -6,6 +6,7 @@ packages=find_packages(), install_requires=[ "langgraph>=0.6.7", + "langgraph-checkpoint-sqlite>=3.0.0", "colorlog>=6.10.0", "rich>=13.0.0", "langchain>=0.3.27",