diff --git a/.gitignore b/.gitignore index e699cc17..6b51abdf 100644 --- a/.gitignore +++ b/.gitignore @@ -47,6 +47,7 @@ memory/* !memory/L4_raw_sessions/ memory/L4_raw_sessions/* !memory/L4_raw_sessions/compress_session.py +!memory/L4_raw_sessions/session_traceback.py # ljqCtrl related tools !memory/ljqCtrl.py @@ -101,7 +102,8 @@ memory/L4_raw_sessions/* .vs/ restore_commit.txt -sche_tasks/ +sche_tasks/* +!sche_tasks/tri_axis_scan.json # CDP Bridge 密钥配置(首次运行自动生成) assets/tmwd_cdp_bridge/config.js assets/copilot_proxy.pyw @@ -113,6 +115,13 @@ reflect/* !reflect/scheduler.py !reflect/agent_team_worker.py !reflect/goal_mode.py +!reflect/analyzers/ +reflect/analyzers/* +!reflect/analyzers/__init__.py +!reflect/analyzers/tri_axis_scanner.py +!reflect/analyzers/emotion_scanner.py +!reflect/analyzers/habit_tracker.py +!reflect/analyzers/abandoned_detector.py # Universal: never track __pycache__ anywhere **/__pycache__/ diff --git a/memory/L4_raw_sessions/session_traceback.py b/memory/L4_raw_sessions/session_traceback.py new file mode 100644 index 00000000..8ab2f83f --- /dev/null +++ b/memory/L4_raw_sessions/session_traceback.py @@ -0,0 +1,282 @@ +"""L4 Session Traceback Tool — 从 all_histories.txt 摘要溯源到完整上下文 +用法: + from session_traceback import traceback + result = traceback("你要不断学习不断迭代直到我主动干预喊停") + print(result['before']) # 前文 + print(result['match']) # 匹配的完整turn + print(result['after']) # 后文 +""" +import zipfile, os, re +from typing import Optional + +L4_DIR = os.path.dirname(os.path.abspath(__file__)) +HIST_PATH = os.path.join(L4_DIR, "all_histories.txt") + +# ─── 内部工具 ─── + +def _load_history_lines(): + with open(HIST_PATH, 'r', encoding='utf-8') as f: + return f.readlines() + +def _find_session(lines: list, target_idx: int) -> Optional[str]: + """从目标行往上找最近的SESSION标记""" + for i in range(target_idx, -1, -1): + if lines[i].startswith("SESSION: "): + return lines[i].strip().replace("SESSION: ", "") + return None + +def _session_to_zip(session: str) -> str: + """从session名推断zip路径 (MMDD_HHMM-MMDD_HHMM → 2026-MM.zip)""" + month = session[:2] + return os.path.join(L4_DIR, f"2026-{month}.zip") + +def _count_occurrence_in_history(lines: list, target_idx: int, session_start_idx: int) -> int: + """计算目标行在同session中是第几次出现(用于消歧短文本)""" + target_text = lines[target_idx].strip() + count = 0 + for i in range(session_start_idx, target_idx + 1): + if lines[i].strip() == target_text: + count += 1 + return count + +def _find_session_start(lines: list, target_idx: int) -> int: + """找到当前session的起始行""" + for i in range(target_idx, -1, -1): + if lines[i].startswith("SESSION: "): + return i + return 0 + +def _extract_turn_boundaries(content: str): + """解析session文件,返回每个turn的(start, user_start, response_start, end)""" + prompt_pattern = re.compile(r'^=== Prompt === .+$', re.MULTILINE) + user_pattern = re.compile(r'^=== USER ===$', re.MULTILINE) + response_pattern = re.compile(r'^=== Response === .+$', re.MULTILINE) + + prompts = [m.start() for m in prompt_pattern.finditer(content)] + turns = [] + for i, p_start in enumerate(prompts): + turn_end = prompts[i+1] if i+1 < len(prompts) else len(content) + # 找这个turn内的USER和Response标记 + segment = content[p_start:turn_end] + u_match = user_pattern.search(segment) + r_match = response_pattern.search(segment) + user_pos = p_start + u_match.start() if u_match else None + resp_pos = p_start + r_match.start() if r_match else None + turns.append({ + 'start': p_start, + 'end': turn_end, + 'user_pos': user_pos, + 'resp_pos': resp_pos, + }) + return turns + +def _get_user_text(content: str, turn: dict) -> str: + """提取一个turn中的用户文本""" + if turn['user_pos'] is None: + return "" + # USER文本在 "=== USER ===\n" 之后,到 "=== Response ===" 之前 + start = content.index('\n', turn['user_pos']) + 1 + end = turn['resp_pos'] if turn['resp_pos'] else turn['end'] + return content[start:end].strip() + +def _get_response_text(content: str, turn: dict) -> str: + """提取一个turn中的response文本""" + if turn['resp_pos'] is None: + return "" + start = content.index('\n', turn['resp_pos']) + 1 + return content[start:turn['end']].strip() + + +# ─── 主函数 ─── + +def traceback(query: str, context_chars: int = 1500, nth: int = 0) -> dict: + """从 all_histories.txt 中的文本溯源到完整上下文 + + Args: + query: 要搜索的文本(可以是 [USER]: xxx 或 [Agent] xxx 格式,也可以是纯文本) + context_chars: 前后文各取多少字符(默认1500) + nth: 如果有多个匹配,取第几个(0-based,默认第一个) + + Returns: + dict with keys: + - session: session名 + - zip_file: zip文件名 + - before: 前文(前一个turn的response尾部 + 当前turn的prompt头) + - match: 匹配到的完整内容(用户消息或agent回复片段) + - after: 后文(当前turn的response或下一个turn的开头) + - turn_index: 在session中第几个turn + - total_turns: session总turn数 + - history_context: all_histories.txt中的上下文行 + """ + lines = _load_history_lines() + + # 标准化查询:去掉前缀 + search_text = query.strip() + if search_text.startswith("[USER]: "): + search_text = search_text[8:] + search_type = "USER" + elif search_text.startswith("[Agent] "): + search_text = search_text[8:] + search_type = "AGENT" + else: + search_type = "AUTO" + + # Step 1: 在 all_histories.txt 中定位 + matches = [] + for i, line in enumerate(lines): + stripped = line.strip() + if search_text in stripped: + session = _find_session(lines, i) + if session: + matches.append((i, session, stripped)) + + if not matches: + return {"error": f"未在 all_histories.txt 中找到: '{search_text[:50]}...'"} + + if nth >= len(matches): + return {"error": f"只找到 {len(matches)} 个匹配,但请求第 {nth+1} 个"} + + target_idx, session, hist_line = matches[nth] + + # 获取 history 上下文(前后各3行) + hist_ctx_start = max(0, target_idx - 3) + hist_ctx_end = min(len(lines), target_idx + 4) + history_context = [] + for j in range(hist_ctx_start, hist_ctx_end): + marker = ">>>" if j == target_idx else " " + history_context.append(f"{marker} {lines[j].rstrip()}") + + # Step 2: 定位zip和文件 + zip_path = _session_to_zip(session) + target_file = f"{session}.txt" + + if not os.path.exists(zip_path): + return {"error": f"ZIP文件不存在: {zip_path}"} + + with zipfile.ZipFile(zip_path, 'r') as zf: + if target_file not in zf.namelist(): + return {"error": f"Session文件 {target_file} 不在 {os.path.basename(zip_path)} 中"} + with zf.open(target_file) as f: + content = f.read().decode('utf-8', errors='replace') + + # Step 3: 在session文件中定位 + # 计算这是session内第几次出现(消歧短文本) + session_start = _find_session_start(lines, target_idx) + occurrence = _count_occurrence_in_history(lines, target_idx, session_start) + + # 搜索策略:先精确搜索,再降级 + if search_type == "USER": + # 用户原话精确存在于 "=== USER ===" 之后 + search_key = search_text[:80] if len(search_text) > 80 else search_text + elif search_type == "AGENT": + # Agent摘要在 标签内 + search_key = search_text[:60] if len(search_text) > 60 else search_text + else: + search_key = search_text[:60] if len(search_text) > 60 else search_text + + # 找到第 occurrence 次出现 + pos = -1 + start_search = 0 + for _ in range(occurrence): + pos = content.find(search_key, start_search) + if pos == -1: + break + start_search = pos + 1 + + if pos == -1: + # 降级:用更短的关键词 + for length in [40, 20, 10]: + short_key = search_text[:length] + pos = content.find(short_key) + if pos >= 0: + break + + if pos == -1: + return { + "error": f"在session文件中未找到匹配文本", + "session": session, + "zip_file": os.path.basename(zip_path), + "history_context": "\n".join(history_context), + "total_matches_in_history": len(matches), + } + + # Step 4: 提取上下文 + before_start = max(0, pos - context_chars) + after_end = min(len(content), pos + len(search_key) + context_chars) + + before_text = content[before_start:pos] + match_text = content[pos:pos + len(search_key)] + after_text = content[pos + len(search_key):after_end] + + # 边界处理:判断前文/后文是否有实质内容 + # 前文:如果只剩 "=== Prompt === ...\n=== USER ===\n" 这类header,视为无效 + before_stripped = re.sub(r'=== (Prompt|USER|Response) ===[^\n]*\n?', '', before_text).strip() + if len(before_stripped) < 20: + before_text = None + + # 后文:如果剩余内容不足20字符有效文本,视为无效 + after_stripped = re.sub(r'=== (Prompt|USER|Response) ===[^\n]*\n?', '', after_text).strip() + if len(after_stripped) < 20: + after_text = None + + # 解析turn结构获取额外信息 + turns = _extract_turn_boundaries(content) + turn_index = -1 + for ti, turn in enumerate(turns): + if turn['start'] <= pos < turn['end']: + turn_index = ti + break + + return { + "session": session, + "zip_file": os.path.basename(zip_path), + "turn_index": turn_index, + "total_turns": len(turns), + "position": pos, + "file_size": len(content), + "before": before_text, + "match": match_text, + "after": after_text, + "history_context": "\n".join(history_context), + "total_matches_in_history": len(matches), + "selected_match": nth, + } + + +def traceback_pretty(query: str, context_chars: int = 1500, nth: int = 0) -> str: + """格式化输出的溯源结果""" + r = traceback(query, context_chars, nth) + if "error" in r: + return f"❌ {r['error']}\n" + r.get('history_context', '') + + output = [] + output.append(f"{'='*60}") + output.append(f"📍 Session: {r['session']} (Turn {r['turn_index']+1}/{r['total_turns']})") + output.append(f"📦 ZIP: {r['zip_file']} | 位置: {r['position']}/{r['file_size']}") + output.append(f"🔍 History匹配: 第{r['selected_match']+1}/{r['total_matches_in_history']}个") + output.append(f"{'='*60}") + output.append(f"\n--- all_histories.txt 上下文 ---") + output.append(r['history_context']) + + if r['before'] is not None: + output.append(f"\n--- 前文 (最后{len(r['before'])}字符) ---") + output.append(r['before'][-800:]) + else: + output.append(f"\n--- 前文: 无(这是session的开头) ---") + + output.append(f"\n{'>'*20} 匹配内容 {'<'*20}") + output.append(r['match']) + output.append(f"{'>'*20} 匹配结束 {'<'*20}") + + if r['after'] is not None: + output.append(f"\n--- 后文 (前{len(r['after'])}字符) ---") + output.append(r['after'][:800]) + else: + output.append(f"\n--- 后文: 无(这是session的结尾) ---") + + return "\n".join(output) + + +if __name__ == "__main__": + # 测试 + print(traceback_pretty("你要不断学习不断迭代直到我主动干预喊停")) diff --git a/reflect/analyzers/__init__.py b/reflect/analyzers/__init__.py new file mode 100644 index 00000000..73ea9650 --- /dev/null +++ b/reflect/analyzers/__init__.py @@ -0,0 +1,20 @@ +""" +reflect/analyzers - 历史分析模块包 + +语义化命名: +- EmotionScanner: 情绪波动检测(原轴1) +- HabitTracker: 持续活跃模式检测(原轴2) +- AbandonedDetector: 已消失事项检测(原轴3) +- TriAxisScanner: 统一调度器 + +兼容旧名: +- EmotionDetector → EmotionScanner +- TrendDetector (保留) +""" +from .emotion_scanner import EmotionScanner +from .habit_tracker import HabitTracker +from .abandoned_detector import AbandonedDetector +from .tri_axis_scanner import TriAxisScanner + +# 兼容旧名 +EmotionDetector = EmotionScanner diff --git a/reflect/analyzers/abandoned_detector.py b/reflect/analyzers/abandoned_detector.py new file mode 100644 index 00000000..537665f3 --- /dev/null +++ b/reflect/analyzers/abandoned_detector.py @@ -0,0 +1,299 @@ +"""消失事项检测器 (原轴3/TrendDetector axis3 的语义化重构) + +核心改动: +- 语义化命名: AbandonedDetector +- 输出只含统计信息(weeks_active, total_count, last_week, gap, pattern) +- 不含traceback_query,因为消失事项的下一步是对接L2记忆层比较,不需要回溯原始日志 +- 基于 all_user_histories.txt(仅用户行)工作 +- 复用HabitTracker的session解析+标注+矩阵逻辑,只改判定条件 +""" +import sys, json, re, time, os +from collections import Counter, defaultdict + +# 统一配置 +try: + from reflect.analyzers._config import PROJECT_ROOT, HIST_PATH, USER_HIST_PATH, get_llm_config, filter_user_histories + from reflect.analyzers._json_utils import robust_json_parse +except (ImportError, ModuleNotFoundError): + from _config import PROJECT_ROOT, HIST_PATH, USER_HIST_PATH, get_llm_config, filter_user_histories + from _json_utils import robust_json_parse + +from llmcore import LLMSession + +# ============================================================ +# 提示词 (与HabitTracker共用) +# ============================================================ +TAG_PROMPT = """你是一个活动标注器。给定用户与AI助手的多个会话摘要,为每个会话标注1-3个活动标签。 + +## 规则 +- 标签应描述用户主动发起的目标或项目(动词+宾语),如"开发truth_finder"、"学习GA机制" +- 只标注用户真正想做的事,忽略AI助手的中间执行步骤(如"搜索资料"、"读取文件"、"调试报错"等是手段不是目标) +- 如果会话内容不明确或太短,标注为"不明确" +- 每个会话独立标注 + +## 输出格式 +JSON数组,每条: {"id": N, "tasks": ["标签1", "标签2"]} +只输出JSON。""" + +NORMALIZE_PROMPT = """你是一个标签归一化器。给定一组活动标签(来自用户与AI助手的对话记录),请将含义相同或高度相关的标签合并为统一名称。 + +## 规则 +- 同一件事的不同表述应合并为一个统一名称(选择最清晰简洁的表述) +- 例如:"开发truth_finder功能"、"调试truth_finder"、"truth_finder优化" → 统一为 "开发truth_finder" +- 例如:"记忆整合"、"记忆整理"、"执行记忆整理" → 统一为 "记忆整合" +- 如果某个标签独立存在、与其他标签无关,保持原样 +- 不要强行合并无关标签 + +## 输出格式 +JSON对象,key是原始标签,value是归一化后的统一名称: +{{"原标签A": "统一名称", "原标签B": "统一名称", "独立标签C": "独立标签C", ...}} + +只输出JSON,不要其他内容。""" + + +class AbandonedDetector: + """消失事项检测器 - 基于周x事项矩阵""" + + BATCH_SIZE = 40 # 每批送LLM标注的session数 + + # 判定阈值 + MIN_WEEKS = 2 # 模式A: 最少出现周数 + MIN_GAP = 5 # 距今最少周数(须>=HabitTracker.RECENT_WINDOW避免逻辑矛盾) + SINGLE_WEEK_MIN = 5 # 模式B: 单周最少出现次数(提高以过滤噪音) + + def __init__(self, hist_path=None, verbose=True): + """ + Args: + hist_path: 用户历史文件路径(应为过滤后的all_user_histories.txt) + verbose: 是否打印进度 + """ + self.hist_path = hist_path or USER_HIST_PATH + self.verbose = verbose + self.sessions = [] + self.total_weeks = 0 + self.week_task_matrix = defaultdict(lambda: defaultdict(int)) + self.task_weeks = defaultdict(set) + + def _parse_sessions(self): + """解析用户历史文件,提取sessions""" + with open(self.hist_path, 'r', encoding='utf-8') as f: + lines = f.readlines() + + current_session = None + week_set = set() + + for line in lines: + stripped = line.strip() + if not stripped: + continue + + if stripped.startswith("SESSION:"): + if current_session: + self.sessions.append(current_session) + + session_name = stripped[8:].strip() + week = self._calc_week(session_name) + current_session = { + 'name': session_name, + 'week': week, + 'user_lines': [] + } + week_set.add(week) + + elif stripped.startswith("[USER]:") and current_session is not None: + text = stripped[7:].strip() + current_session['user_lines'].append(text) + + if current_session: + self.sessions.append(current_session) + + self.total_weeks = max(week_set) if week_set else 0 + + if self.verbose: + print(f"[AbandonedDetector] 解析完成: {len(self.sessions)} sessions, {self.total_weeks} 周") + + def _calc_week(self, session_name): + """从session名计算属于第几周""" + try: + start_part = session_name.split('-')[0] + month = int(start_part[:2]) + day = int(start_part[2:4]) + day_of_year = (month - 1) * 30 + day + return day_of_year // 7 + 1 + except (ValueError, IndexError): + return 1 + + def _call_llm(self, system_prompt, user_prompt): + """调用LLM""" + session = LLMSession(get_llm_config()) + session.system = system_prompt + gen = session.ask(user_prompt) + return ''.join(gen) + + def _extract_tasks_batch(self, batch_sessions): + """批量提取session的活动标签""" + batch_text = "" + for i, s in enumerate(batch_sessions, 1): + summary = " | ".join(s['user_lines'][:5])[:200] + batch_text += f"[{i}] {s['name']}: {summary}\n" + + result = self._call_llm(TAG_PROMPT, f"以下是{len(batch_sessions)}个会话摘要:\n\n{batch_text}") + + # 鲁棒JSON解析 + items = robust_json_parse(result, expect_type="array") + if items: + for item in items: + idx = item.get('id', 0) - 1 + if 0 <= idx < len(batch_sessions): + batch_sessions[idx]['tasks'] = item.get('tasks', []) + return True + return False + + def _tag_all_sessions(self): + """批量标注所有session""" + total_batches = (len(self.sessions) - 1) // self.BATCH_SIZE + 1 + + for batch_idx in range(total_batches): + start = batch_idx * self.BATCH_SIZE + end = min(start + self.BATCH_SIZE, len(self.sessions)) + batch = self.sessions[start:end] + + success = self._extract_tasks_batch(batch) + + if self.verbose: + tagged = sum(1 for s in batch if 'tasks' in s) + status = "OK" if success else "FAIL" + print(f" Batch {batch_idx+1}/{total_batches}: {status} {tagged}/{len(batch)}") + + time.sleep(0.3) + + untagged = [s for s in self.sessions if 'tasks' not in s] + if untagged and self.verbose: + print(f" 重试 {len(untagged)} 未标注sessions...") + self._extract_tasks_batch(untagged) + + tagged_count = sum(1 for s in self.sessions if 'tasks' in s) + if self.verbose: + print(f" 标注完成: {tagged_count}/{len(self.sessions)}") + + def _build_matrix(self): + """构建周x事项矩阵,通过LLM自动归一化同义标签""" + all_tasks = Counter() + for s in self.sessions: + for t in s.get('tasks', []): + all_tasks[t] += 1 + + # 纯LLM归一化:将所有出现>=2次的标签交给LLM聚类 + reverse_map = {} + freq_tasks = [t for t, c in all_tasks.items() if c >= 2] + + if len(freq_tasks) >= 2: + task_list = "\n".join(f"- {t} (出现{all_tasks[t]}次)" for t in freq_tasks) + result = self._call_llm(NORMALIZE_PROMPT, f"标签列表:\n{task_list}") + mappings = robust_json_parse(result, expect_type="object") + if mappings: + for orig, target in mappings.items(): + if isinstance(target, str) and target.strip(): + reverse_map[orig] = target.strip() + + self.week_task_matrix = defaultdict(lambda: defaultdict(int)) + self.task_weeks = defaultdict(set) + + for s in self.sessions: + week = s['week'] + for t in s.get('tasks', []): + unified = reverse_map.get(t, t) + self.week_task_matrix[unified][week] += 1 + self.task_weeks[unified].add(week) + + if self.verbose: + multi_week = sum(1 for ws in self.task_weeks.values() if len(ws) >= 2) + print(f"[AbandonedDetector] 矩阵: {len(self.task_weeks)}事项, {multi_week}个跨>=2周") + + def detect(self, llm_client=None): + """ + 执行完整pipeline: 解析→标注→矩阵→判定 + + Returns: [{ + "task": str, + "weeks_active": [int], + "total_count": int, + "last_week": int, + "gap": int, + "pattern": "multi_week" | "single_burst" + }] + + 注意:不含traceback_query,因为消失事项的下一步是对接L2记忆层比较 + """ + # Phase 1: 解析sessions + self._parse_sessions() + + # Phase 2-3: LLM标注 + self._tag_all_sessions() + + # Phase 4: 构建矩阵 + self._build_matrix() + + # Phase 5: 判定 + current_week = self.total_weeks + abandoned = [] + + for task, weeks in self.task_weeks.items(): + if task in ("不明确",): + continue + + weeks_sorted = sorted(weeks) + num_weeks = len(weeks_sorted) + span = weeks_sorted[-1] - weeks_sorted[0] + 1 + last_week = weeks_sorted[-1] + gap = current_week - last_week + total_count = sum(self.week_task_matrix[task].values()) + + # 模式A: 出现>=2周 + 距今>=3周 + is_pattern_a = (num_weeks >= self.MIN_WEEKS and gap >= self.MIN_GAP) + # 模式B: 单周但>=3次 + 距今>=3周 + is_pattern_b = (num_weeks == 1 and total_count >= self.SINGLE_WEEK_MIN + and gap >= self.MIN_GAP) + + if is_pattern_a or is_pattern_b: + abandoned.append({ + "task": task, + "weeks_active": weeks_sorted, + "total_count": total_count, + "last_week": last_week, + "gap": gap, + "pattern": "multi_week" if is_pattern_a else "single_burst" + }) + + # 排序 + abandoned.sort(key=lambda x: -x['total_count']) + + if self.verbose: + print(f"\n[AbandonedDetector] 检出 {len(abandoned)} 个消失事项:") + for item in abandoned[:15]: + weeks_str = ",".join(f"W{w}" for w in item['weeks_active']) + print(f" \u2717 {item['task']}: {item['total_count']}次 ({weeks_str}) " + f"距今{item['gap']}周 [{item['pattern']}]") + + return abandoned + + def run(self, llm_client=None): + """主入口,兼容旧接口""" + return self.detect(llm_client) + + +# ============================================================ +# CLI入口 +# ============================================================ +if __name__ == "__main__": + # 确保用户历史文件存在 + filter_user_histories() + + detector = AbandonedDetector() + results = detector.detect() + + # 保存结果 + output_path = os.path.join(os.path.dirname(__file__), "abandoned_results.json") + with open(output_path, 'w', encoding='utf-8') as f: + json.dump(results, f, ensure_ascii=False, indent=2, default=list) + print(f"\n结果已保存: {output_path}") diff --git a/reflect/analyzers/emotion_scanner.py b/reflect/analyzers/emotion_scanner.py new file mode 100644 index 00000000..e5499ce6 --- /dev/null +++ b/reflect/analyzers/emotion_scanner.py @@ -0,0 +1,364 @@ +"""情绪扫描器 (原轴1/EmotionDetector 的语义化重构) + +核心改动: +- 语义化命名: EmotionScanner +- 输出增加 traceback_query + occurrence_nth 字段,可直接调 session_traceback 溯源 +- 基于 all_user_histories.txt(仅用户行)工作 +- 保留完整LLM批处理+滑动窗口聚类逻辑 +""" +import sys, json, re, time, os +from collections import Counter + +# 统一配置 +try: + from reflect.analyzers._config import PROJECT_ROOT, HIST_PATH, USER_HIST_PATH, get_llm_config, filter_user_histories + from reflect.analyzers._json_utils import robust_json_parse +except (ImportError, ModuleNotFoundError): + from _config import PROJECT_ROOT, HIST_PATH, USER_HIST_PATH, get_llm_config, filter_user_histories + from _json_utils import robust_json_parse + +from llmcore import LLMSession + +# ============================================================ +# 系统提示词 (经过真实数据验证, 0假阳性) +# ============================================================ +SYSTEM_PROMPT = """你是一个精确的情绪波动检测器。你的任务是找出用户在与AI助手交互中"失去冷静"的瞬间——即出现了超出正常沟通所需的情绪化表达。 + +## 核心原则 +正常的纠正、指出错误、提出要求都是冷静的沟通行为,不算情绪波动。 +只有当用户的表达方式本身变得情绪化、不必要地激烈、或明显超出理性沟通所需时,才标记。 + +## NEGATIVE(用户情绪超出冷静沟通范围) +满足以下任一条件: +1. 【强】表达方式明显情绪化,出现不必要的激烈语言(confidence 4-5) +2. 【中】语气中带有明显的不耐烦、挫败或嘲讽,虽未爆发但已偏离理性沟通(confidence 2-3) + +强信号(conf 4-5): +- 累积挫败后的爆发("我都说了多少遍了"、"你到底有没有在听"、"我真的服了") +- 讽刺/反问攻击("你是不是根本不会"、"这也叫完成了?") +- 夸张化表达("每次都是这样"、"从来没有一次对的"、"完全是在浪费时间") +- 情绪词/语气词堆叠("真的很无语"、"太离谱了") +- 威胁/最后通牒语气("再这样我就不用了") +- 人身化批评("你太笨了"、"你是不是傻") + +中信号(conf 2-3): +- 反复纠正同一问题后语气变得生硬/急促("不是!是X!"、"我说的是Y不是Z!") +- 用反问句表达对AI能力的质疑("你有认真看吗"、"这不是很明显吗") +- 命令语气突然加重,带有"你给我..."、"你必须..."等强制性表达 +- 用"又"、"还是"、"依然"等词暗示AI反复犯同样错误时带有不满语气 +- 明确表达失望但未爆发("我本来以为你能..."、"算了我自己来"带叹气感) + +弱信号(conf 1-2): +- 冷淡地放弃/跳过("算了不弄了"、"跳过这个吧"语气中隐含对AI无能的失望) +- 突然变得简短冷淡(之前详细交流,突然只回"行"、"随便"、"你看着办"暗示不想再解释) +- 带有轻微讽刺的确认("好吧好吧"、"行吧你说的对"明显敷衍) +- 不得不降低期望("那就先这样吧"、"凑合用吧"暗示不满意但放弃争取) +- 第二次以上纠正同一类错误,虽然语气平静但能感受到耐心在消耗 + +## POSITIVE(用户情绪化地表达惊喜/兴奋) +同样要求表达超出正常确认: +- 惊喜爆发("卧槽这也行"、"太牛了吧"、"我靠完美") +- 兴奋到语无伦次或用大量感叹号 +- 超预期的强烈赞美(不是简单的"不错"而是"这简直是神作") + +## NEUTRAL(默认——绝大多数发言应该是这个) +以下全部是NEUTRAL,无论内容多么"负面": +- 冷静地指出错误("这里不对,应该是X")→ 正常沟通 +- 冷静地纠正理解("我指的不是这个")→ 正常沟通 +- 冷静地表达不满意("这个方案不太行,换一个")→ 正常沟通 +- 要求重做("重新来"、"不是这样的")→ 正常沟通 +- 简单确认/赞同("好的"、"可以"、"不错"、"做得好")→ 正常沟通 +- 描述问题/bug → 正常沟通 +- 布置任务/给信息 → 正常沟通 +- 对自己的代码/产品不满 → 与AI无关 + +## 判断技巧 +问自己:如果把这句话的"情绪化修饰"去掉,信息量会减少吗? +- 如果去掉情绪化部分后信息完整 → 说明情绪化是多余的 → 标记 +- 如果表达本身就是在传递信息 → 正常沟通 → NEUTRAL + +## 输出格式 +仅输出JSON数组,每条非NEUTRAL的结果: +[{"id": N, "label": "NEGATIVE"|"POSITIVE", "confidence": 1-5, "reason": "一句话理由"}] +如果全部NEUTRAL,输出空数组: []""" + + +class EmotionScanner: + """LLM情绪检测器 + 波动聚类分析 + 溯源信息""" + + BATCH_SIZE = 20 # 每批送LLM的条数 + WINDOW_SIZE = 50 # 滑动窗口大小(行) + TIER1_THRESHOLD = 12 # 波动分阈值(密度x平均conf) + TIER2_MIN_CONF = 5 # 孤立高强度点的最低confidence + + def __init__(self, hist_path=None, verbose=True): + """ + Args: + hist_path: 用户历史文件路径(应为过滤后的all_user_histories.txt) + 如果为None,自动使用USER_HIST_PATH + verbose: 是否打印进度 + """ + self.hist_path = hist_path or USER_HIST_PATH + self.verbose = verbose + # 用于计算occurrence_nth的计数器 + self._text_occurrence_counter = Counter() + + def _load_user_lines(self, start_line=None, end_line=None): + """加载指定区间的USER行,返回[(line_no, text), ...]""" + with open(self.hist_path, 'r', encoding='utf-8') as f: + lines = f.readlines() + + start = (start_line or 1) - 1 + end = end_line or len(lines) + + user_lines = [] + for i, line in enumerate(lines[start:end], start=start+1): + stripped = line.strip() + if stripped.startswith("[USER]:"): + user_lines.append((i, stripped[:300])) # 截断过长文本 + return user_lines + + def _build_traceback_info(self, text, line_no): + """构建溯源信息,可直接传给session_traceback.traceback() + + Args: + text: 原始用户行文本(含[USER]:前缀) + line_no: 在hist文件中的行号 + + Returns: + dict with traceback_query (去掉[USER]:前缀的纯文本) 和 occurrence_nth + """ + # 去掉[USER]:前缀得到纯文本query + query = text + if query.startswith("[USER]:"): + query = query[7:].strip() + + # 计算这段文本是第几次出现(nth) + nth = self._text_occurrence_counter[query] + self._text_occurrence_counter[query] += 1 + + return { + "traceback_query": query, + "occurrence_nth": nth + } + + def _call_llm(self, batch_text, llm_client=None): + """调用LLM进行情绪分析,返回解析后的列表""" + if llm_client: + session = llm_client + else: + session = LLMSession(get_llm_config()) + session.system = SYSTEM_PROMPT + + gen = session.ask(f"分析以下用户发言:\n\n{batch_text}") + result = ''.join(gen) + + # 鲁棒JSON解析 + items = robust_json_parse(result, expect_type="array") + if items is None: + if self.verbose: + print(f" [WARN] JSON解析失败: {result[:200]}") + return [] + return items + + def detect(self, start_line=None, end_line=None, llm_client=None): + """ + 执行情绪检测,返回所有检出结果(含溯源信息) + + Returns: [{ + "line_no": int, "label": str, "confidence": int, + "reason": str, "text": str, + "traceback_query": str, # 可直接传给session_traceback + "occurrence_nth": int # 第几次出现,传给nth参数 + }] + """ + user_lines = self._load_user_lines(start_line, end_line) + if self.verbose: + print(f"[EmotionScanner] 加载 {len(user_lines)} 条USER行 (L{start_line or 1}~L{end_line or 'END'})") + + # 重置计数器 + self._text_occurrence_counter = Counter() + all_detections = [] + + # 分批处理 + for batch_start in range(0, len(user_lines), self.BATCH_SIZE): + batch = user_lines[batch_start:batch_start + self.BATCH_SIZE] + batch_text = "\n".join([f"[{i+1}] {text}" for i, (ln, text) in enumerate(batch)]) + + results = self._call_llm(batch_text, llm_client) + + for item in results: + idx = item.get('id', 0) - 1 + if 0 <= idx < len(batch): + ln, text = batch[idx] + tb_info = self._build_traceback_info(text, ln) + all_detections.append({ + "line_no": ln, + "label": item['label'], + "confidence": item['confidence'], + "reason": item['reason'], + "text": text, + "traceback_query": tb_info['traceback_query'], + "occurrence_nth": tb_info['occurrence_nth'] + }) + + if self.verbose: + neg_count = sum(1 for r in results if r.get('label') == 'NEGATIVE') + print(f" 批次 {batch_start//self.BATCH_SIZE + 1}/{(len(user_lines)-1)//self.BATCH_SIZE + 1}: " + f"{neg_count}条负面 / {len(batch)}条") + + time.sleep(0.3) # 避免rate limit + + return all_detections + + def cluster(self, detections): + """ + 对检测结果进行波动聚类分析 + Returns: {"tier1_clusters": [...], "tier2_isolated": [...]} + """ + negatives = sorted([d for d in detections if d['label'] == 'NEGATIVE'], + key=lambda x: x['line_no']) + + if not negatives: + return {"tier1_clusters": [], "tier2_isolated": []} + + # 滑动窗口聚类 + windows = [] + i = 0 + while i < len(negatives): + window_start = negatives[i]['line_no'] + window_end = window_start + self.WINDOW_SIZE + + # 收集窗口内的所有负面检出 + window_items = [] + j = i + while j < len(negatives) and negatives[j]['line_no'] <= window_end: + window_items.append(negatives[j]) + j += 1 + + if len(window_items) >= 2: # 至少2条才算聚类 + avg_conf = sum(item['confidence'] for item in window_items) / len(window_items) + score = len(window_items) * avg_conf + windows.append({ + "start": window_items[0]['line_no'], + "end": window_items[-1]['line_no'], + "count": len(window_items), + "avg_conf": round(avg_conf, 1), + "score": round(score, 1), + "items": window_items + }) + i = j # 跳过已处理的 + else: + i += 1 + + # 去重合并重叠窗口 + merged = [] + for w in sorted(windows, key=lambda x: -x['score']): + overlap = False + for m in merged: + if not (w['end'] < m['start'] - 10 or w['start'] > m['end'] + 10): + overlap = True + break + if not overlap: + merged.append(w) + + # 分级 + tier1 = sorted([w for w in merged if w['score'] >= self.TIER1_THRESHOLD], + key=lambda x: -x['score']) + + # 找孤立高强度点(conf=5且不在任何cluster中) + clustered_lines = set() + for w in merged: + for item in w['items']: + clustered_lines.add(item['line_no']) + + tier2 = [d for d in negatives + if d['confidence'] >= self.TIER2_MIN_CONF + and d['line_no'] not in clustered_lines] + + return {"tier1_clusters": tier1, "tier2_isolated": tier2} + + def run(self, start_line=None, end_line=None, llm_client=None): + """ + 完整流程: 检测 -> 聚类 -> 输出 + Returns: 完整结果字典 + """ + # Step 1: LLM检测 + detections = self.detect(start_line, end_line, llm_client) + + # Step 2: 聚类分析 + clusters = self.cluster(detections) + + # Step 3: 统计 + user_lines = self._load_user_lines(start_line, end_line) + neg_count = sum(1 for d in detections if d['label'] == 'NEGATIVE') + pos_count = sum(1 for d in detections if d['label'] == 'POSITIVE') + + result = { + "tier1_clusters": clusters['tier1_clusters'], + "tier2_isolated": clusters['tier2_isolated'], + "all_detections": detections, + "stats": { + "total_user_lines": len(user_lines), + "total_negative": neg_count, + "total_positive": pos_count, + "detection_rate": round(neg_count / max(len(user_lines), 1) * 100, 1), + "tier1_count": len(clusters['tier1_clusters']), + "tier2_count": len(clusters['tier2_isolated']), + "deep_dive_positions": len(clusters['tier1_clusters']) + len(clusters['tier2_isolated']) + } + } + + if self.verbose: + print(f"\n[EmotionScanner 结果汇总]") + print(f" 总USER行: {result['stats']['total_user_lines']}") + print(f" 负面检出: {neg_count} ({result['stats']['detection_rate']}%)") + print(f" Tier1爆发区: {result['stats']['tier1_count']}个") + print(f" Tier2孤立高强度: {result['stats']['tier2_count']}个") + print(f" 需深挖位置: {result['stats']['deep_dive_positions']}个") + + return result + + +# ============================================================ +# CLI入口 +# ============================================================ +if __name__ == "__main__": + import argparse + parser = argparse.ArgumentParser(description="情绪扫描器") + parser.add_argument("--start", type=int, default=None, help="起始行号") + parser.add_argument("--end", type=int, default=None, help="结束行号") + parser.add_argument("--output", type=str, default="emotion_results.json", help="输出文件") + parser.add_argument("--quiet", action="store_true", help="静默模式") + args = parser.parse_args() + + # 确保用户历史文件存在 + filter_user_histories() + + scanner = EmotionScanner(verbose=not args.quiet) + results = scanner.run(start_line=args.start, end_line=args.end) + + # 保存结果 + output = { + "stats": results['stats'], + "tier1_clusters": [{ + "start": c['start'], "end": c['end'], + "count": c['count'], "score": c['score'], + "items": [{ + "text": item['text'][:100], + "traceback_query": item['traceback_query'][:100], + "occurrence_nth": item['occurrence_nth'], + "confidence": item['confidence'] + } for item in c['items'][:5]] + } for c in results['tier1_clusters']], + "tier2_isolated": [{ + "line_no": d['line_no'], "confidence": d['confidence'], + "text": d['text'][:100], "reason": d['reason'], + "traceback_query": d['traceback_query'][:100], + "occurrence_nth": d['occurrence_nth'] + } for d in results['tier2_isolated']] + } + + with open(args.output, 'w', encoding='utf-8') as f: + json.dump(output, f, ensure_ascii=False, indent=2) + print(f"\n结果已保存到: {args.output}") diff --git a/reflect/analyzers/habit_tracker.py b/reflect/analyzers/habit_tracker.py new file mode 100644 index 00000000..f9912987 --- /dev/null +++ b/reflect/analyzers/habit_tracker.py @@ -0,0 +1,461 @@ +"""高频习惯追踪器 (原轴2/TrendDetector axis2 的语义化重构) + +核心改动: +- 语义化命名: HabitTracker +- 输出增加 source_lines 字段: 每个task对应的原始USER行文本列表 + source_lines[i]['text'] 可直接传给 session_traceback.traceback() 溯源 +- 基于 all_user_histories.txt(仅用户行)工作 +- 保留完整LLM批处理+周矩阵+归一化逻辑 +""" +import sys, json, re, time, os +from collections import Counter, defaultdict + +# 统一配置 +try: + from reflect.analyzers._config import PROJECT_ROOT, HIST_PATH, USER_HIST_PATH, get_llm_config, filter_user_histories + from reflect.analyzers._json_utils import robust_json_parse +except (ImportError, ModuleNotFoundError): + from _config import PROJECT_ROOT, HIST_PATH, USER_HIST_PATH, get_llm_config, filter_user_histories + from _json_utils import robust_json_parse + +from llmcore import LLMSession + +# ============================================================ +# 提示词 +# ============================================================ +TAG_PROMPT = """你是一个活动标注器。给定用户与AI助手的多个会话摘要,为每个会话标注1-3个活动标签。 + +## 规则 +- 标签应描述用户主动发起的目标或项目(动词+宾语),如"开发truth_finder"、"学习GA机制" +- 只标注用户真正想做的事,忽略AI助手的中间执行步骤(如"搜索资料"、"读取文件"、"调试报错"等是手段不是目标) +- 如果会话内容不明确或太短,标注为"不明确" +- 每个会话独立标注 + +## 输出格式 +JSON数组,每条: {"id": N, "tasks": ["标签1", "标签2"]} +只输出JSON。""" + +NORMALIZE_PROMPT = """你是一个标签归一化器。给定一组活动标签(来自用户与AI助手的对话记录),请将含义相同或高度相关的标签合并为统一名称。 + +## 规则 +- 同一件事的不同表述应合并为一个统一名称(选择最清晰简洁的表述) +- 例如:"开发truth_finder功能"、"调试truth_finder"、"truth_finder优化" → 统一为 "开发truth_finder" +- 例如:"记忆整合"、"记忆整理"、"执行记忆整理" → 统一为 "记忆整合" +- 如果某个标签独立存在、与其他标签无关,保持原样 +- 不要强行合并无关标签 + +## 输出格式 +JSON对象,key是原始标签,value是归一化后的统一名称: +{{"原标签A": "统一名称", "原标签B": "统一名称", "独立标签C": "独立标签C", ...}} + +只输出JSON,不要其他内容。""" + + +class HabitTracker: + """高频习惯追踪器 - 基于周x事项矩阵 + 溯源""" + + BATCH_SIZE = 40 # 每批送LLM标注的session数 + + # 判定阈值 (放宽版, 经602 sessions验证) + MIN_WEEKS = 2 # 最少出现周数 + MIN_SPAN = 2 # 最少跨越周数 + RECENT_WINDOW = 5 # 最近N周内有出现才算活跃(放宽以覆盖间歇性习惯) + + def __init__(self, hist_path=None, verbose=True): + """ + Args: + hist_path: 用户历史文件路径(应为过滤后的all_user_histories.txt) + verbose: 是否打印进度 + """ + self.hist_path = hist_path or USER_HIST_PATH + self.verbose = verbose + self.sessions = [] + self.total_weeks = 0 + self.week_task_matrix = defaultdict(lambda: defaultdict(int)) + self.task_weeks = defaultdict(set) + # 存储每个session的原始USER行,用于溯源 + self._session_user_lines = {} # session_name -> [user_line_text, ...] + + def _parse_sessions(self): + """解析用户历史文件,提取sessions和对应的USER行""" + with open(self.hist_path, 'r', encoding='utf-8') as f: + lines = f.readlines() + + current_session = None + current_lines = [] + week_set = set() + + for line in lines: + stripped = line.strip() + if not stripped: + continue + + if stripped.startswith("SESSION:"): + # 保存上一个session + if current_session: + self.sessions.append(current_session) + self._session_user_lines[current_session['name']] = current_lines + + # 解析新session + session_name = stripped[8:].strip() + # 从session名提取周信息 (格式: MMDD_HHMM-MMDD_HHMM) + week = self._calc_week(session_name) + current_session = { + 'name': session_name, + 'week': week, + 'user_lines': [] + } + current_lines = [] + week_set.add(week) + + elif stripped.startswith("[USER]:") and current_session is not None: + text = stripped[7:].strip() + current_session['user_lines'].append(text) + current_lines.append(text) + + # 最后一个session + if current_session: + self.sessions.append(current_session) + self._session_user_lines[current_session['name']] = current_lines + + self.total_weeks = max(week_set) if week_set else 0 + + if self.verbose: + print(f"[HabitTracker] 解析完成: {len(self.sessions)} sessions, {self.total_weeks} 周") + + def _calc_week(self, session_name): + """从session名计算属于第几周(简化:按月日推算)""" + try: + # 格式: MMDD_HHMM-MMDD_HHMM + start_part = session_name.split('-')[0] + month = int(start_part[:2]) + day = int(start_part[2:4]) + # 简化周计算:以1月1日为基准 + day_of_year = (month - 1) * 30 + day + return day_of_year // 7 + 1 + except (ValueError, IndexError): + return 1 + + def _call_llm(self, system_prompt, user_prompt): + """调用LLM""" + session = LLMSession(get_llm_config()) + session.system = system_prompt + gen = session.ask(user_prompt) + return ''.join(gen) + + def _extract_tasks_batch(self, batch_sessions): + """批量提取session的活动标签""" + batch_text = "" + for i, s in enumerate(batch_sessions, 1): + summary = " | ".join(s['user_lines'][:5])[:200] + batch_text += f"[{i}] {s['name']}: {summary}\n" + + result = self._call_llm(TAG_PROMPT, f"以下是{len(batch_sessions)}个会话摘要:\n\n{batch_text}") + + # 鲁棒JSON解析 + items = robust_json_parse(result, expect_type="array") + if items: + for item in items: + idx = item.get('id', 0) - 1 + if 0 <= idx < len(batch_sessions): + batch_sessions[idx]['tasks'] = item.get('tasks', []) + return True + return False + + def _tag_all_sessions(self): + """批量标注所有session""" + total_batches = (len(self.sessions) - 1) // self.BATCH_SIZE + 1 + + for batch_idx in range(total_batches): + start = batch_idx * self.BATCH_SIZE + end = min(start + self.BATCH_SIZE, len(self.sessions)) + batch = self.sessions[start:end] + + success = self._extract_tasks_batch(batch) + + if self.verbose: + tagged = sum(1 for s in batch if 'tasks' in s) + status = "OK" if success else "FAIL" + print(f" Batch {batch_idx+1}/{total_batches}: {status} {tagged}/{len(batch)}") + + time.sleep(0.3) + + # 重试未标注的 + untagged = [s for s in self.sessions if 'tasks' not in s] + if untagged and self.verbose: + print(f" 重试 {len(untagged)} 未标注sessions...") + self._extract_tasks_batch(untagged) + + tagged_count = sum(1 for s in self.sessions if 'tasks' in s) + if self.verbose: + print(f" 标注完成: {tagged_count}/{len(self.sessions)}") + + def _build_matrix(self): + """构建周x事项矩阵,通过LLM自动归一化同义标签""" + # 统计所有标签 + all_tasks = Counter() + for s in self.sessions: + for t in s.get('tasks', []): + all_tasks[t] += 1 + + # 纯LLM归一化:将所有出现>=2次的标签交给LLM聚类 + reverse_map = {} + freq_tasks = [t for t, c in all_tasks.items() if c >= 2] + + if len(freq_tasks) >= 2: + # 让LLM自主发现同义标签并合并 + task_list = "\n".join(f"- {t} (出现{all_tasks[t]}次)" for t in freq_tasks) + result = self._call_llm(NORMALIZE_PROMPT, f"标签列表:\n{task_list}") + mappings = robust_json_parse(result, expect_type="object") + if mappings: + for orig, target in mappings.items(): + if isinstance(target, str) and target.strip(): + reverse_map[orig] = target.strip() + + # 构建矩阵 + self.week_task_matrix = defaultdict(lambda: defaultdict(int)) + self.task_weeks = defaultdict(set) + + for s in self.sessions: + week = s['week'] + for t in s.get('tasks', []): + unified = reverse_map.get(t, t) + self.week_task_matrix[unified][week] += 1 + self.task_weeks[unified].add(week) + + # 存储reverse_map供溯源使用 + self._reverse_map = reverse_map + + if self.verbose: + multi_week = sum(1 for ws in self.task_weeks.values() if len(ws) >= 2) + print(f"[HabitTracker] 矩阵: {len(self.task_weeks)}事项, {multi_week}个跨>=2周") + + def _collect_source_lines(self, task, sessions, max_per_session=3, max_total=15): + """收集某个task对应的代表性USER行文本(用于溯源) + + 策略: + 1. 从session中找所有含task关键词的行 + 2. 如果找到,取第一个匹配行作为锚点(最能代表用户发起该任务的时刻) + 3. 如果整个session都没匹配(可能LLM从语义而非关键词判断),fallback取line[0] + + 这样在多任务session中,不会把属于任务A的首行错误归到任务B + + Args: + task: 归一化后的任务名 + sessions: 包含该task的session列表 + max_per_session: 每个session最多取几行 + max_total: 总共最多取几行 + + Returns: + [{"text": "用户原话", "session": "session_name"}, ...] + """ + # === 关键词提取 === + # 步骤: 分隔符拆 → 中英文边界拆 → 停用词过滤 → 中文bigram扩展 + # 停用词: 高频无区分度的通用词,过滤后提升匹配精度 + _STOPWORDS = {'项目', '功能', '内容', '方案', '工具', '系统', '文件', '代码', + '问题', '东西', '了解', '学习', '研究', '调研', '查看', '检查', + '测试', '使用', '配置', '设计', '开发', '优化', '分析', '处理', + '实现', '运行', '执行', '操作', '帮我', '一下', '一个', '什么', + '怎么', '这个', '那个', '可以', '需要', '进行'} + + # 子进程消息模式 (agent内部调度产生的,非用户真实输入) + _SUBPROCESS_PATTERNS = [ + r'^SOP:', r'^你是.{0,30}子任务执行者', + r'^你是DeepResearch', r'^\[System\]', + r'^## 任务', r'^Phase\d+', + ] + + parts = re.split(r'[/\s\-_,。、]', task) + keywords = [] + for part in parts: + sub_parts = re.split(r'(?<=[a-zA-Z0-9])(?=[\u4e00-\u9fff])|(?<=[\u4e00-\u9fff])(?=[a-zA-Z0-9])', part) + for w in sub_parts: + if re.match(r'^[a-zA-Z0-9]+$', w): + if len(w) >= 1: + keywords.append(w) + elif len(w) >= 2: + keywords.append(w) + + # 过滤停用词 + keywords = [kw for kw in keywords if kw not in _STOPWORDS] + + # 中文长词(>2字)展开为bigram,增加部分匹配能力 + # 例: "新趋势" → ["新趋势", "新趋", "趋势"] + expanded = [] + for kw in keywords: + expanded.append(kw) + if not re.match(r'^[a-zA-Z0-9]+$', kw) and len(kw) > 2: + for i in range(len(kw) - 1): + bigram = kw[i:i+2] + if bigram not in expanded: + expanded.append(bigram) + keywords = expanded + + # 去重(保序) + seen = set() + unique_kws = [] + for kw in keywords: + kl = kw.lower() + if kl not in seen: + seen.add(kl) + unique_kws.append(kw) + keywords = unique_kws + + # 匹配阈值: 至少命中2个关键词才算精确匹配 + # 若关键词总数不足2,则要求全部命中 + min_match = min(2, len(keywords)) + + source_lines = [] + for s in sessions: + if len(source_lines) >= max_total: + break + session_name = s.get('name', '') + user_lines = self._session_user_lines.get(session_name, []) + if not user_lines and 'user_lines' in s: + user_lines = s['user_lines'] + if not user_lines: + continue + + # 过滤子进程消息 + clean_lines = [] + for idx, line_text in enumerate(user_lines): + is_subprocess = any(re.search(pat, line_text) for pat in _SUBPROCESS_PATTERNS) + if not is_subprocess: + clean_lines.append((idx, line_text)) + + # 多关键词匹配: 统计每行命中的关键词数 + matched = [] + for idx, line_text in clean_lines: + line_lower = line_text.lower() + hits = sum(1 for kw in keywords if kw.lower() in line_lower) + if hits >= min_match: + matched.append((idx, line_text, hits)) + + # 按命中数降序排列,优先取最相关的行 + matched.sort(key=lambda x: -x[2]) + + collected = 0 + if matched: + for idx, line_text, _ in matched: + if collected >= max_per_session: + break + source_lines.append({ + "text": line_text, + "session": session_name + }) + collected += 1 + else: + # 无精确匹配(LLM从语义判断的),fallback取首条有意义的行 + # 过滤掉长度<=4或纯寒暄的行(如"你好"、"嗯"、"好的"等) + _GREETINGS = {'你好', '嗯', '好的', '好', '谢谢', '感谢', '是的', '对', + '可以', '行', '没问题', 'hi', 'hello', '嗨', '在吗'} + for _, line_text in clean_lines: + stripped_text = line_text.strip() + if len(stripped_text) > 4 and stripped_text not in _GREETINGS: + source_lines.append({ + "text": line_text, + "session": session_name + }) + break + + return source_lines[:max_total] + + def detect(self, llm_client=None): + """ + 执行完整pipeline: 解析→标注→矩阵→判定 + + Returns: [{ + "task": str, + "weeks_active": [int], + "total_count": int, + "span": int, + "weekly_counts": {week: count}, + "source_lines": [{"text": str, "session": str}] # 溯源用 + }] + """ + # Phase 1: 解析sessions + self._parse_sessions() + + # Phase 2-3: LLM标注 + self._tag_all_sessions() + + # Phase 4: 构建矩阵 + self._build_matrix() + + # Phase 5: 判定 + current_week = self.total_weeks + habits = [] + + for task, weeks in self.task_weeks.items(): + if task in ("不明确",): + continue + + weeks_sorted = sorted(weeks) + num_weeks = len(weeks_sorted) + span = weeks_sorted[-1] - weeks_sorted[0] + 1 + last_week = weeks_sorted[-1] + gap = current_week - last_week + total_count = sum(self.week_task_matrix[task].values()) + + # 判定: 出现>=MIN_WEEKS周 + 跨度>=MIN_SPAN + 最近RECENT_WINDOW周内有出现 + is_habit = (num_weeks >= self.MIN_WEEKS and + span >= self.MIN_SPAN and + gap < self.RECENT_WINDOW) + + if is_habit: + # 收集溯源信息: 找到包含此task的所有sessions + related_sessions = [s for s in self.sessions + if task in s.get('tasks', []) or + self._reverse_map.get(task, '') in [self._reverse_map.get(t, t) for t in s.get('tasks', [])]] + # 更精确: 找归一化后匹配的sessions + matched_sessions = [] + for s in self.sessions: + for t in s.get('tasks', []): + unified = self._reverse_map.get(t, t) + if unified == task: + matched_sessions.append(s) + break + + source_lines = self._collect_source_lines(task, matched_sessions) + + habits.append({ + "task": task, + "weeks_active": weeks_sorted, + "total_count": total_count, + "span": span, + "weekly_counts": dict(self.week_task_matrix[task]), + "source_lines": source_lines + }) + + # 排序 + habits.sort(key=lambda x: -x['total_count']) + + if self.verbose: + print(f"\n[HabitTracker] 检出 {len(habits)} 个持续活跃事项:") + for item in habits: + weeks_str = ",".join(f"W{w}" for w in item['weeks_active']) + print(f" \u2605 {item['task']}: {item['total_count']}次 ({weeks_str}), " + f"溯源行数: {len(item['source_lines'])}") + + return habits + + def run(self, llm_client=None): + """主入口,兼容旧接口""" + return self.detect(llm_client) + + +# ============================================================ +# CLI入口 +# ============================================================ +if __name__ == "__main__": + # 确保用户历史文件存在 + filter_user_histories() + + tracker = HabitTracker() + results = tracker.detect() + + # 保存结果 + output_path = os.path.join(os.path.dirname(__file__), "habit_results.json") + with open(output_path, 'w', encoding='utf-8') as f: + json.dump(results, f, ensure_ascii=False, indent=2, default=list) + print(f"\n结果已保存: {output_path}") diff --git a/reflect/analyzers/tri_axis_scanner.py b/reflect/analyzers/tri_axis_scanner.py new file mode 100644 index 00000000..edf2601a --- /dev/null +++ b/reflect/analyzers/tri_axis_scanner.py @@ -0,0 +1,376 @@ +""" +统一扫描器 (Unified Scanner) +功能: 一次调用同时运行情绪扫描、习惯追踪、消失事项检测 +设计: 增量扫描 + 状态持久化 + 定时任务集成 + +使用方式: + # 作为模块调用 + from tri_axis_scanner import TriAxisScanner + scanner = TriAxisScanner() + report = scanner.run() + + # 命令行 + python tri_axis_scanner.py [--full] # --full 强制全量扫描 + +输出: 扫描报告dict + 写入 scan_report.json + +增量策略: + 情绪扫描: 记录上次扫描到的行号,只扫新增部分 + 习惯/消失: 每次全量(基于周级矩阵,数据量可控) + +定时频率建议: every_3d (每3天) 或 weekly +""" + +import sys, os, json, time, traceback +from datetime import datetime + +# 统一配置: 路径发现 + LLM配置 + 数据预处理 (兼容包导入和直接运行) +try: + from reflect.analyzers._config import ( + PROJECT_ROOT, HIST_PATH, get_llm_config, ensure_histories, filter_user_histories + ) +except (ImportError, ModuleNotFoundError): + from _config import PROJECT_ROOT, HIST_PATH, get_llm_config, ensure_histories, filter_user_histories + +# ============================================================ +# 配置 +# ============================================================ +BASE_DIR = os.path.dirname(os.path.abspath(__file__)) +STATE_FILE = os.path.join(BASE_DIR, "scan_state.json") +REPORT_FILE = os.path.join(BASE_DIR, "scan_report.json") + +# 情绪扫描增量:每次最少扫描的新增行数阈值(低于此值跳过) +EMOTION_MIN_NEW_LINES = 200 + + +class TriAxisScanner: + """统一扫描调度器""" + + def __init__(self, verbose=True, force_full=False): + self.verbose = verbose + self.force_full = force_full + self.state = self._load_state() + self.report = { + "scan_time": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "emotion": None, + "habits": None, + "abandoned": None, + "errors": [], + "summary": "" + } + + # ============================================================ + # 状态管理 + # ============================================================ + def _load_state(self): + """加载上次扫描状态""" + if os.path.exists(STATE_FILE): + try: + with open(STATE_FILE, 'r', encoding='utf-8') as f: + return json.load(f) + except (json.JSONDecodeError, IOError): + pass + return { + "last_scan_time": None, + "emotion_last_line": 0, + "habits_last_scan": None, + "scan_count": 0 + } + + def _save_state(self): + """保存扫描状态""" + self.state["last_scan_time"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + self.state["scan_count"] = self.state.get("scan_count", 0) + 1 + with open(STATE_FILE, 'w', encoding='utf-8') as f: + json.dump(self.state, f, ensure_ascii=False, indent=2) + + # ============================================================ + # 情绪扫描 (增量) + # ============================================================ + def _run_emotion(self): + """运行情绪扫描,增量扫描新增行""" + if self.verbose: + print("\n" + "="*60) + print("[情绪扫描] Emotion Scanner") + print("="*60) + + # 使用用户历史文件 + user_hist = filter_user_histories() + with open(user_hist, 'r', encoding='utf-8') as f: + total_lines = sum(1 for _ in f) + + last_line = 0 if self.force_full else self.state.get("emotion_last_line", 0) + new_lines = total_lines - last_line + + if new_lines < EMOTION_MIN_NEW_LINES and not self.force_full: + msg = f"新增行数不足({new_lines}<{EMOTION_MIN_NEW_LINES}),跳过情绪扫描" + if self.verbose: + print(f" [SKIP] {msg}") + return {"skipped": True, "reason": msg, "new_lines": new_lines} + + if self.verbose: + print(f" 扫描范围: L{last_line+1} ~ L{total_lines} ({new_lines}行)") + + try: + from .emotion_scanner import EmotionScanner + except ImportError: + from emotion_scanner import EmotionScanner + + scanner = EmotionScanner(hist_path=user_hist, verbose=self.verbose) + start_line = last_line + 1 if last_line > 0 else None + results = scanner.run(start_line=start_line) + + # 更新状态 + self.state["emotion_last_line"] = total_lines + + # 完整报告:保留全部检出 + 聚类作为辅助视图 + emotion_report = { + "scan_range": [last_line + 1, total_lines], + "new_lines_scanned": new_lines, + "all_detections": results.get("all_detections", []), + "clusters": results.get("tier1_clusters", []), + "tier2_isolated": results.get("tier2_isolated", []), + "stats": results.get("stats", {}), + } + + if self.verbose: + t1 = len(emotion_report["clusters"]) + t2 = len(emotion_report.get("all_detections", [])) + print(f" [OK] 完成: {t1}个波动区 + {t2}条检出") + + return emotion_report + + # ============================================================ + # 习惯追踪 (全量) + # ============================================================ + def _run_habits(self): + """运行习惯追踪,全量扫描""" + if self.verbose: + print("\n" + "="*60) + print("[习惯追踪] Habit Tracker") + print("="*60) + + user_hist = filter_user_histories() + + try: + from .habit_tracker import HabitTracker + except ImportError: + from habit_tracker import HabitTracker + + tracker = HabitTracker(hist_path=user_hist, verbose=self.verbose) + results = tracker.detect() + + # 更新状态 + self.state["habits_last_scan"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + habits_report = { + "count": len(results), + "items": [{ + "task": item["task"], + "weeks_active": item["weeks_active"], + "total_count": item["total_count"], + "span": item["span"], + "source_lines": item.get("source_lines", []) + } for item in results] + } + + if self.verbose: + print(f" [OK] 习惯追踪: {habits_report['count']}项持续活跃") + for item in results[:5]: + print(f" * {item['task']} ({item['total_count']}次, span={item['span']}周)") + + return habits_report + + # ============================================================ + # 消失事项检测 (全量) + # ============================================================ + def _run_abandoned(self): + """运行消失事项检测,全量扫描""" + if self.verbose: + print("\n" + "="*60) + print("[消失检测] Abandoned Detector") + print("="*60) + + user_hist = filter_user_histories() + + try: + from .abandoned_detector import AbandonedDetector + except ImportError: + from abandoned_detector import AbandonedDetector + + detector = AbandonedDetector(hist_path=user_hist, verbose=self.verbose) + results = detector.detect() + + abandoned_report = { + "count": len(results), + "items": [{ + "task": item["task"], + "weeks_active": item["weeks_active"], + "total_count": item["total_count"], + "last_week": item["last_week"], + "gap": item["gap"] + } for item in results[:20]] + } + + if self.verbose: + print(f" [OK] 消失检测: {abandoned_report['count']}项已消失") + for item in results[:5]: + print(f" [X] {item['task']} ({item['total_count']}次, gap={item['gap']}周)") + + return abandoned_report + + # ============================================================ + # 主入口 + # ============================================================ + def run(self): + """运行统一扫描,返回完整报告""" + # 确保 all_histories.txt 存在 + ensure_histories() + + start_time = time.time() + + if self.verbose: + print("============================================================") + print(" 统一扫描器 (Unified Scanner)") + print("============================================================") + mode = "全量" if self.force_full else "增量" + print(f" 模式: {mode} | 第{self.state.get('scan_count',0)+1}次扫描") + print(f" 时间: {self.report['scan_time']}") + print("============================================================") + + # --- 情绪扫描 --- + try: + self.report["emotion"] = self._run_emotion() + except Exception as e: + err = f"情绪扫描异常: {e}\n{traceback.format_exc()}" + self.report["errors"].append(err) + self.report["emotion"] = {"error": str(e)} + if self.verbose: + print(f" [X] 情绪扫描失败: {e}") + + # --- 习惯追踪 --- + try: + self.report["habits"] = self._run_habits() + except Exception as e: + err = f"习惯追踪异常: {e}\n{traceback.format_exc()}" + self.report["errors"].append(err) + self.report["habits"] = {"error": str(e)} + if self.verbose: + print(f" [X] 习惯追踪失败: {e}") + + # --- 消失检测 --- + try: + self.report["abandoned"] = self._run_abandoned() + except Exception as e: + err = f"消失检测异常: {e}\n{traceback.format_exc()}" + self.report["errors"].append(err) + self.report["abandoned"] = {"error": str(e)} + if self.verbose: + print(f" [X] 消失检测失败: {e}") + + # --- 汇总 --- + elapsed = time.time() - start_time + self.report["elapsed_seconds"] = round(elapsed, 1) + self.report["summary"] = self._build_summary() + + # 保存报告和状态 + with open(REPORT_FILE, 'w', encoding='utf-8') as f: + json.dump(self.report, f, ensure_ascii=False, indent=2) + self._save_state() + + # 写 done 目录的 md 报告(供 scheduler 判断完成状态) + self._write_done_report() + + if self.verbose: + print(f"\n{'='*60}") + print(f"扫描完成 ({elapsed:.1f}s)") + print(f"报告: {REPORT_FILE}") + print(f"{'='*60}") + print(f"\n{self.report['summary']}") + + return self.report + + def _write_done_report(self): + """写 sche_tasks/done/ 目录的 md 报告""" + done_dir = os.path.join(PROJECT_ROOT, 'sche_tasks', 'done') + os.makedirs(done_dir, exist_ok=True) + ts = datetime.now().strftime('%Y-%m-%d_%H%M') + rpt_path = os.path.join(done_dir, f'{ts}_tri_axis_scan.md') + + lines = [ + f'# 三轴统一扫描报告', + f'- 执行时间: {datetime.now().strftime("%Y-%m-%d %H:%M")}', + f'- 模式: {"全量" if self.force_full else "增量"} | ' + f'第{self.state.get("scan_count", 0) + 1}次扫描', + f'- 耗时: {self.report.get("elapsed_seconds", "?")}s', + '', + f'## 核心结论', + self.report.get('summary', '无'), + '', + '## 详细数据', + f'完整结构化数据见: {REPORT_FILE}', + '', + '## 错误记录', + ] + errors = self.report.get('errors', []) + if errors: + for e in errors: + lines.append(f'- {e}') + else: + lines.append('无') + + with open(rpt_path, 'w', encoding='utf-8') as f: + f.write('\n'.join(lines)) + + if self.verbose: + print(f" Done报告: {rpt_path}") + + def _build_summary(self): + """生成人类可读的摘要""" + lines = [] + + # 情绪 + emo = self.report.get("emotion", {}) + if emo.get("skipped"): + lines.append(f"情绪: 跳过({emo.get('reason','')})") + elif emo.get("error"): + lines.append(f"情绪: 错误 - {emo['error']}") + else: + t1 = len(emo.get("clusters", [])) + t2 = len(emo.get("tier2_isolated", [])) + lines.append(f"情绪: {t1}个波动区 + {t2}个高强度点") + + # 习惯 + hab = self.report.get("habits", {}) + if hab.get("error"): + lines.append(f"习惯: 错误 - {hab['error']}") + else: + count = hab.get("count", 0) + names = [item["task"] for item in hab.get("items", [])[:3]] + lines.append(f"习惯: {count}项 [{', '.join(names)}]") + + # 消失 + abd = self.report.get("abandoned", {}) + if abd.get("error"): + lines.append(f"消失: 错误 - {abd['error']}") + else: + count = abd.get("count", 0) + names = [item["task"] for item in abd.get("items", [])[:3]] + lines.append(f"消失: {count}项 [{', '.join(names)}]") + + return " | ".join(lines) + + +# ============================================================ +# CLI入口 +# ============================================================ +if __name__ == "__main__": + import traceback + force = "--full" in sys.argv + try: + scanner = TriAxisScanner(verbose=True, force_full=force) + report = scanner.run() + sys.exit(0) + except Exception: + traceback.print_exc() + sys.exit(1) diff --git a/reflect/scheduler.py b/reflect/scheduler.py index 28d701ef..a673a418 100644 --- a/reflect/scheduler.py +++ b/reflect/scheduler.py @@ -1,4 +1,4 @@ -import os, json, time as _time, socket as _socket, logging +import os, json, time as _time, socket as _socket, logging, subprocess from datetime import datetime, timedelta # 端口锁:防止重复启动,bind失败时agentmain会直接崩溃退出 @@ -28,6 +28,47 @@ # 默认最大延迟窗口(小时),超过此时间不触发 DEFAULT_MAX_DELAY = 6 _l4_t = 0 # last L4 archive time +_running = {} # tid -> Popen, 跟踪direct模式子进程 + +def _exec_direct(tid, task): + """直接用子进程运行任务脚本,不经过agent,无超时限制""" + global _running + # 如果已有同名进程在跑,检查是否结束 + if tid in _running: + proc = _running[tid] + if proc.poll() is None: + return # 还在跑,跳过 + else: + del _running[tid] # 已结束,允许重新触发 + + script = task.get('exec_script', '') + if not script: + _logger.error(f'direct task {tid} missing exec_script') + return + + script_path = os.path.normpath(os.path.join(TASKS, '..', script)) + if not os.path.isfile(script_path): + _logger.error(f'direct task {tid} script not found: {script_path}') + return + + python = os.path.join(os.path.dirname(os.sys.executable), 'python.exe') + if not os.path.isfile(python): + python = os.sys.executable # fallback + + _logger.info(f'DIRECT_START {tid}: {script_path}') + env = os.environ.copy() + env['PYTHONIOENCODING'] = 'utf-8' + log_path = os.path.join(DONE, f'{tid}_stderr.log') + stderr_f = open(log_path, 'w', encoding='utf-8') + proc = subprocess.Popen( + [python, script_path], + cwd=os.path.dirname(script_path), + stdout=subprocess.DEVNULL, + stderr=stderr_f, + env=env, + ) + _running[tid] = proc + proc._stderr_file = stderr_f # 保持引用防止GC关闭 def _parse_cooldown(repeat): """解析repeat为冷却时间(比实际周期略短,防漂移)""" @@ -119,6 +160,12 @@ def check(): # 触发 _logger.info(f'TRIGGER {tid} (repeat={repeat}, schedule={sched}, ' f'last_run={last})') + + # exec_mode=direct: 子进程直接运行,不经过agent + if task.get('exec_mode') == 'direct': + _exec_direct(tid, task) + continue + ts = now.strftime('%Y-%m-%d_%H%M') rpt = os.path.join(DONE, f'{ts}_{tid}.md') prompt = task.get('prompt', '') diff --git a/sche_tasks/tri_axis_scan.json b/sche_tasks/tri_axis_scan.json new file mode 100644 index 00000000..b217016e --- /dev/null +++ b/sche_tasks/tri_axis_scan.json @@ -0,0 +1,8 @@ +{ + "schedule": "03:00", + "repeat": "weekly", + "enabled": true, + "max_delay_hours": 12, + "exec_mode": "direct", + "exec_script": "reflect/analyzers/tri_axis_scanner.py" +} \ No newline at end of file