Skip to content

Commit fc0857e

Browse files
committed
feat: concept dedup with briefs, update/related paths, extract _compile_concepts
1 parent 4f1d332 commit fc0857e

2 files changed

Lines changed: 301 additions & 143 deletions

File tree

openkb/agent/compiler.py

Lines changed: 133 additions & 137 deletions
Original file line numberDiff line numberDiff line change
@@ -396,127 +396,144 @@ def _update_index(wiki_dir: Path, doc_name: str, concept_names: list[str]) -> No
396396
DEFAULT_COMPILE_CONCURRENCY = 5
397397

398398

399-
async def compile_short_doc(
400-
doc_name: str,
401-
source_path: Path,
399+
async def _compile_concepts(
400+
wiki_dir: Path,
402401
kb_dir: Path,
403402
model: str,
404-
max_concurrency: int = DEFAULT_COMPILE_CONCURRENCY,
403+
system_msg: dict,
404+
doc_msg: dict,
405+
summary: str,
406+
doc_name: str,
407+
max_concurrency: int,
405408
) -> None:
406-
"""Compile a short document using a multi-step LLM pipeline with caching.
409+
"""Shared Steps 2-4: concepts plan → generate/update → index.
407410
408-
Step 1: Build base context A (schema + doc content).
409-
Step 2: A → generate summary.
410-
Step 3: A + summary → extract concept list.
411-
Step 4: Concurrent LLM calls (A cached) → generate each concept page.
412-
Step 5: Code writes files, updates index.
411+
Uses ``_CONCEPTS_PLAN_USER`` to get a plan with create/update/related
412+
actions, then executes each action type accordingly.
413413
"""
414-
from openkb.config import load_config
415-
416-
openkb_dir = kb_dir / ".openkb"
417-
config = load_config(openkb_dir / "config.yaml")
418-
language: str = config.get("language", "en")
419-
420-
wiki_dir = kb_dir / "wiki"
421-
schema_md = get_agents_md(wiki_dir)
422414
source_file = _find_source_filename(doc_name, kb_dir)
423-
content = source_path.read_text(encoding="utf-8")
424415

425-
# Base context A: system + document
426-
system_msg = {"role": "system", "content": _SYSTEM_TEMPLATE.format(
427-
schema_md=schema_md, language=language,
428-
)}
429-
doc_msg = {"role": "user", "content": _SUMMARY_USER.format(
430-
doc_name=doc_name, content=content,
431-
)}
416+
# --- Step 2: Get concepts plan (A cached) ---
417+
concept_briefs = _read_concept_briefs(wiki_dir)
432418

433-
# --- Step 1: Generate summary ---
434-
summary = _llm_call(model, [system_msg, doc_msg], "summary")
435-
_write_summary(wiki_dir, doc_name, source_file, summary)
436-
437-
# --- Step 2: Extract concept list (A cached) ---
438-
_, existing_concepts = _read_wiki_context(wiki_dir)
439-
440-
concepts_list_raw = _llm_call(model, [
419+
plan_raw = _llm_call(model, [
441420
system_msg,
442421
doc_msg,
443422
{"role": "assistant", "content": summary},
444-
{"role": "user", "content": _CONCEPTS_LIST_USER.format(
445-
existing_concepts=", ".join(existing_concepts) if existing_concepts else "(none yet)",
423+
{"role": "user", "content": _CONCEPTS_PLAN_USER.format(
424+
concept_briefs=concept_briefs,
446425
)},
447-
], "concepts-list", max_tokens=512)
426+
], "concepts-plan", max_tokens=1024)
448427

449428
try:
450-
concepts_list = _parse_json(concepts_list_raw)
429+
parsed = _parse_json(plan_raw)
451430
except (json.JSONDecodeError, ValueError) as exc:
452-
logger.warning("Failed to parse concepts list: %s", exc)
453-
logger.debug("Raw: %s", concepts_list_raw)
431+
logger.warning("Failed to parse concepts plan: %s", exc)
432+
logger.debug("Raw: %s", plan_raw)
454433
_update_index(wiki_dir, doc_name, [])
455434
return
456435

457-
if not concepts_list:
436+
# Fallback: if LLM returns a flat list, treat all items as "create"
437+
if isinstance(parsed, list):
438+
plan = {"create": parsed, "update": [], "related": []}
439+
else:
440+
plan = {
441+
"create": parsed.get("create", []),
442+
"update": parsed.get("update", []),
443+
"related": parsed.get("related", []),
444+
}
445+
446+
create_items = plan["create"]
447+
update_items = plan["update"]
448+
related_items = plan["related"]
449+
450+
if not create_items and not update_items and not related_items:
458451
_update_index(wiki_dir, doc_name, [])
459452
return
460453

461-
# --- Step 3: Generate concept pages concurrently (A cached) ---
454+
# --- Step 3: Generate/update concept pages concurrently (A cached) ---
462455
semaphore = asyncio.Semaphore(max_concurrency)
463456

464-
async def _gen_concept(concept: dict) -> tuple[str, str, bool]:
457+
async def _gen_create(concept: dict) -> tuple[str, str, bool]:
465458
name = concept["name"]
466459
title = concept.get("title", name)
467-
is_update = concept.get("is_update", False)
468-
update_instruction = (
469-
"This concept page already exists. Add new information from this document "
470-
"without duplicating existing content."
471-
if is_update else ""
472-
)
473-
474460
async with semaphore:
475461
page_content = await _llm_call_async(model, [
476462
system_msg,
477463
doc_msg,
478464
{"role": "assistant", "content": summary},
479465
{"role": "user", "content": _CONCEPT_PAGE_USER.format(
480466
title=title, doc_name=doc_name,
481-
update_instruction=update_instruction,
467+
update_instruction="",
482468
)},
483469
], f"concept:{name}")
470+
return name, page_content, False
484471

485-
return name, page_content, is_update
472+
async def _gen_update(concept: dict) -> tuple[str, str, bool]:
473+
name = concept["name"]
474+
title = concept.get("title", name)
475+
concept_path = wiki_dir / "concepts" / f"{name}.md"
476+
if concept_path.exists():
477+
raw_text = concept_path.read_text(encoding="utf-8")
478+
if raw_text.startswith("---"):
479+
parts = raw_text.split("---", 2)
480+
existing_content = parts[2].strip() if len(parts) >= 3 else raw_text
481+
else:
482+
existing_content = raw_text
483+
else:
484+
existing_content = "(page not found — create from scratch)"
485+
async with semaphore:
486+
page_content = await _llm_call_async(model, [
487+
system_msg,
488+
doc_msg,
489+
{"role": "assistant", "content": summary},
490+
{"role": "user", "content": _CONCEPT_UPDATE_USER.format(
491+
title=title, doc_name=doc_name,
492+
existing_content=existing_content,
493+
)},
494+
], f"update:{name}")
495+
return name, page_content, True
486496

487-
sys.stdout.write(f" Generating {len(concepts_list)} concept(s) (concurrency={max_concurrency})...\n")
488-
sys.stdout.flush()
497+
tasks = []
498+
tasks.extend(_gen_create(c) for c in create_items)
499+
tasks.extend(_gen_update(c) for c in update_items)
489500

490-
results = await asyncio.gather(
491-
*[_gen_concept(c) for c in concepts_list],
492-
return_exceptions=True,
493-
)
501+
concept_names: list[str] = []
494502

495-
concept_names = []
496-
for r in results:
497-
if isinstance(r, Exception):
498-
logger.warning("Concept generation failed: %s", r)
499-
continue
500-
name, page_content, is_update = r
501-
_write_concept(wiki_dir, name, page_content, source_file, is_update)
502-
concept_names.append(name)
503+
if tasks:
504+
total = len(tasks)
505+
sys.stdout.write(f" Generating {total} concept(s) (concurrency={max_concurrency})...\n")
506+
sys.stdout.flush()
507+
508+
results = await asyncio.gather(*tasks, return_exceptions=True)
509+
510+
for r in results:
511+
if isinstance(r, Exception):
512+
logger.warning("Concept generation failed: %s", r)
513+
continue
514+
name, page_content, is_update = r
515+
_write_concept(wiki_dir, name, page_content, source_file, is_update)
516+
concept_names.append(name)
517+
518+
# --- Step 3b: Process related items (code only, no LLM) ---
519+
for slug in related_items:
520+
_add_related_link(wiki_dir, slug, doc_name, source_file)
503521

504522
# --- Step 4: Update index (code only) ---
505523
_update_index(wiki_dir, doc_name, concept_names)
506524

507525

508-
async def compile_long_doc(
526+
async def compile_short_doc(
509527
doc_name: str,
510-
summary_path: Path,
511-
doc_id: str,
528+
source_path: Path,
512529
kb_dir: Path,
513530
model: str,
514531
max_concurrency: int = DEFAULT_COMPILE_CONCURRENCY,
515532
) -> None:
516-
"""Compile a long (PageIndex) document's concepts and index.
533+
"""Compile a short document using a multi-step LLM pipeline with caching.
517534
518-
The summary page is already written by the indexer. This function
519-
generates concept pages and updates the index.
535+
Step 1: Build base context A (schema + doc content), generate summary.
536+
Steps 2-4: Delegated to ``_compile_concepts``.
520537
"""
521538
from openkb.config import load_config
522539

@@ -527,84 +544,63 @@ async def compile_long_doc(
527544
wiki_dir = kb_dir / "wiki"
528545
schema_md = get_agents_md(wiki_dir)
529546
source_file = _find_source_filename(doc_name, kb_dir)
530-
summary = summary_path.read_text(encoding="utf-8")
547+
content = source_path.read_text(encoding="utf-8")
531548

532-
# Base context A
549+
# Base context A: system + document
533550
system_msg = {"role": "system", "content": _SYSTEM_TEMPLATE.format(
534551
schema_md=schema_md, language=language,
535552
)}
536-
doc_msg = {"role": "user", "content": _LONG_DOC_SUMMARY_USER.format(
537-
doc_name=doc_name, doc_id=doc_id, content=summary,
553+
doc_msg = {"role": "user", "content": _SUMMARY_USER.format(
554+
doc_name=doc_name, content=content,
538555
)}
539556

540-
# --- Step 1: Extract concept list ---
541-
_, existing_concepts = _read_wiki_context(wiki_dir)
542-
543-
# Get a concise overview first (for concept generation context)
544-
overview = _llm_call(model, [system_msg, doc_msg], "overview")
557+
# --- Step 1: Generate summary ---
558+
summary = _llm_call(model, [system_msg, doc_msg], "summary")
559+
_write_summary(wiki_dir, doc_name, source_file, summary)
545560

546-
concepts_list_raw = _llm_call(model, [
547-
system_msg,
548-
doc_msg,
549-
{"role": "assistant", "content": overview},
550-
{"role": "user", "content": _CONCEPTS_LIST_USER.format(
551-
existing_concepts=", ".join(existing_concepts) if existing_concepts else "(none yet)",
552-
)},
553-
], "concepts-list", max_tokens=512)
561+
# --- Steps 2-4: Concept plan → generate/update → index ---
562+
await _compile_concepts(
563+
wiki_dir, kb_dir, model, system_msg, doc_msg,
564+
summary, doc_name, max_concurrency,
565+
)
554566

555-
try:
556-
concepts_list = _parse_json(concepts_list_raw)
557-
except (json.JSONDecodeError, ValueError) as exc:
558-
logger.warning("Failed to parse concepts list: %s", exc)
559-
logger.debug("Raw: %s", concepts_list_raw)
560-
_update_index(wiki_dir, doc_name, [])
561-
return
562567

563-
if not concepts_list:
564-
_update_index(wiki_dir, doc_name, [])
565-
return
568+
async def compile_long_doc(
569+
doc_name: str,
570+
summary_path: Path,
571+
doc_id: str,
572+
kb_dir: Path,
573+
model: str,
574+
max_concurrency: int = DEFAULT_COMPILE_CONCURRENCY,
575+
) -> None:
576+
"""Compile a long (PageIndex) document's concepts and index.
566577
567-
# --- Step 2: Generate concept pages concurrently ---
568-
semaphore = asyncio.Semaphore(max_concurrency)
578+
The summary page is already written by the indexer. This function
579+
generates concept pages and updates the index.
580+
"""
581+
from openkb.config import load_config
569582

570-
async def _gen_concept(concept: dict) -> tuple[str, str, bool]:
571-
name = concept["name"]
572-
title = concept.get("title", name)
573-
is_update = concept.get("is_update", False)
574-
update_instruction = (
575-
"This concept page already exists. Add new information."
576-
if is_update else ""
577-
)
583+
openkb_dir = kb_dir / ".openkb"
584+
config = load_config(openkb_dir / "config.yaml")
585+
language: str = config.get("language", "en")
578586

579-
async with semaphore:
580-
page_content = await _llm_call_async(model, [
581-
system_msg,
582-
doc_msg,
583-
{"role": "assistant", "content": overview},
584-
{"role": "user", "content": _CONCEPT_PAGE_USER.format(
585-
title=title, doc_name=doc_name,
586-
update_instruction=update_instruction,
587-
)},
588-
], f"concept:{name}")
587+
wiki_dir = kb_dir / "wiki"
588+
schema_md = get_agents_md(wiki_dir)
589+
summary_content = summary_path.read_text(encoding="utf-8")
589590

590-
return name, page_content, is_update
591+
# Base context A
592+
system_msg = {"role": "system", "content": _SYSTEM_TEMPLATE.format(
593+
schema_md=schema_md, language=language,
594+
)}
595+
doc_msg = {"role": "user", "content": _LONG_DOC_SUMMARY_USER.format(
596+
doc_name=doc_name, doc_id=doc_id, content=summary_content,
597+
)}
591598

592-
sys.stdout.write(f" Generating {len(concepts_list)} concept(s) (concurrency={max_concurrency})...\n")
593-
sys.stdout.flush()
599+
# --- Step 1: Generate overview ---
600+
overview = _llm_call(model, [system_msg, doc_msg], "overview")
594601

595-
results = await asyncio.gather(
596-
*[_gen_concept(c) for c in concepts_list],
597-
return_exceptions=True,
602+
# --- Steps 2-4: Concept plan → generate/update → index ---
603+
await _compile_concepts(
604+
wiki_dir, kb_dir, model, system_msg, doc_msg,
605+
overview, doc_name, max_concurrency,
598606
)
599-
600-
concept_names = []
601-
for r in results:
602-
if isinstance(r, Exception):
603-
logger.warning("Concept generation failed: %s", r)
604-
continue
605-
name, page_content, is_update = r
606-
_write_concept(wiki_dir, name, page_content, source_file, is_update)
607-
concept_names.append(name)
608-
609-
# --- Step 3: Update index (code only) ---
610-
_update_index(wiki_dir, doc_name, concept_names)

0 commit comments

Comments
 (0)