Skip to content

Commit 39ae5c5

Browse files
committed
feat: add get_page_content tool and parse_pages helper
Adds parse_pages() to expand page specs like "1-3,7" into sorted deduplicated int lists, and get_page_content() to read per-page JSON (sources/{doc}.json) and format output with optional image paths. Includes path-traversal guard consistent with existing tools.
1 parent 072d9f5 commit 39ae5c5

2 files changed

Lines changed: 167 additions & 1 deletion

File tree

openkb/agent/tools.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
"""
77
from __future__ import annotations
88

9+
import json as _json
910
from pathlib import Path
1011

1112

@@ -52,6 +53,86 @@ def read_wiki_file(path: str, wiki_root: str) -> str:
5253
return full_path.read_text(encoding="utf-8")
5354

5455

56+
def parse_pages(pages: str) -> list[int]:
57+
"""Parse a page specification string into a sorted, deduplicated list of page numbers.
58+
59+
Args:
60+
pages: Page spec such as ``"3-5,7,10-12"``.
61+
62+
Returns:
63+
Sorted list of positive page numbers, e.g. ``[3, 4, 5, 7, 10, 11, 12]``.
64+
"""
65+
result: set[int] = set()
66+
for part in pages.split(","):
67+
part = part.strip()
68+
if "-" in part:
69+
# Handle ranges like "3-5"; also handle negative numbers by only
70+
# splitting on the first "-" that follows a digit.
71+
segments = part.split("-")
72+
# Re-join to handle leading negatives: segments[0] may be empty
73+
# if part starts with "-". We just try to parse start/end.
74+
try:
75+
if len(segments) == 2:
76+
start, end = int(segments[0]), int(segments[1])
77+
result.update(range(start, end + 1))
78+
elif len(segments) == 3 and segments[0] == "":
79+
# e.g. "-1" split gives ['', '1']
80+
result.add(-int(segments[1]))
81+
# More complex cases (e.g. negative range) are ignored.
82+
except ValueError:
83+
pass
84+
else:
85+
try:
86+
result.add(int(part))
87+
except ValueError:
88+
pass
89+
return sorted(n for n in result if n > 0)
90+
91+
92+
def get_page_content(doc_name: str, pages: str, wiki_root: str) -> str:
93+
"""Return formatted content for specified pages of a document.
94+
95+
Reads ``{wiki_root}/sources/{doc_name}.json`` which must be a JSON array of
96+
objects with at least ``{"page": int, "content": str}`` fields and an
97+
optional ``"images"`` list of ``{"path": str, ...}`` objects.
98+
99+
Args:
100+
doc_name: Document name without extension (e.g. ``"paper"``).
101+
pages: Page specification string (e.g. ``"1-3,7"``).
102+
wiki_root: Absolute path to the wiki root directory.
103+
104+
Returns:
105+
Formatted page content, or an error message string.
106+
"""
107+
root = Path(wiki_root).resolve()
108+
target = (root / "sources" / f"{doc_name}.json").resolve()
109+
if not target.is_relative_to(root):
110+
return "Access denied: path escapes wiki root."
111+
if not target.exists():
112+
return f"File not found: sources/{doc_name}.json"
113+
114+
data = _json.loads(target.read_text(encoding="utf-8"))
115+
requested = set(parse_pages(pages))
116+
matches = [entry for entry in data if entry.get("page") in requested]
117+
118+
if not matches:
119+
return f"No content found for pages {pages} in {doc_name}."
120+
121+
parts: list[str] = []
122+
for entry in matches:
123+
page_num = entry["page"]
124+
content = entry.get("content", "")
125+
block = f"[Page {page_num}]\n{content}"
126+
images = entry.get("images")
127+
if images:
128+
paths = ", ".join(img["path"] for img in images if "path" in img)
129+
if paths:
130+
block += f"\n[Images: {paths}]"
131+
parts.append(block)
132+
133+
return "\n\n".join(parts) + "\n\n"
134+
135+
55136
def write_wiki_file(path: str, content: str, wiki_root: str) -> str:
56137
"""Write or overwrite a Markdown file in the wiki.
57138

tests/test_agent_tools.py

Lines changed: 86 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
import pytest
77

8-
from openkb.agent.tools import list_wiki_files, read_wiki_file, write_wiki_file
8+
from openkb.agent.tools import get_page_content, list_wiki_files, parse_pages, read_wiki_file, write_wiki_file
99

1010

1111
# ---------------------------------------------------------------------------
@@ -128,3 +128,88 @@ def test_returns_written_path(self, tmp_path):
128128
result = write_wiki_file("reports/health.md", "All good.", wiki_root)
129129

130130
assert result == "Written: reports/health.md"
131+
132+
133+
# ---------------------------------------------------------------------------
134+
# parse_pages
135+
# ---------------------------------------------------------------------------
136+
137+
138+
class TestParsePages:
139+
def test_single_page(self):
140+
assert parse_pages("3") == [3]
141+
142+
def test_range(self):
143+
assert parse_pages("3-5") == [3, 4, 5]
144+
145+
def test_comma_separated(self):
146+
assert parse_pages("1,3,5") == [1, 3, 5]
147+
148+
def test_mixed(self):
149+
assert parse_pages("1-3,7,10-12") == [1, 2, 3, 7, 10, 11, 12]
150+
151+
def test_deduplication(self):
152+
assert parse_pages("3,3,3") == [3]
153+
154+
def test_sorted(self):
155+
assert parse_pages("5,1,3") == [1, 3, 5]
156+
157+
def test_ignores_zero_and_negative(self):
158+
assert parse_pages("0,-1,3") == [3]
159+
160+
161+
# ---------------------------------------------------------------------------
162+
# get_page_content
163+
# ---------------------------------------------------------------------------
164+
165+
166+
class TestGetPageContent:
167+
def test_reads_pages_from_json(self, tmp_path):
168+
import json
169+
wiki_root = str(tmp_path)
170+
sources = tmp_path / "sources"
171+
sources.mkdir()
172+
pages = [
173+
{"page": 1, "content": "Page one text."},
174+
{"page": 2, "content": "Page two text."},
175+
{"page": 3, "content": "Page three text."},
176+
]
177+
(sources / "paper.json").write_text(json.dumps(pages), encoding="utf-8")
178+
result = get_page_content("paper", "1,3", wiki_root)
179+
assert "[Page 1]" in result
180+
assert "Page one text." in result
181+
assert "[Page 3]" in result
182+
assert "Page three text." in result
183+
assert "Page two" not in result
184+
185+
def test_returns_error_for_missing_file(self, tmp_path):
186+
wiki_root = str(tmp_path)
187+
(tmp_path / "sources").mkdir()
188+
result = get_page_content("nonexistent", "1", wiki_root)
189+
assert "not found" in result.lower()
190+
191+
def test_returns_error_for_no_matching_pages(self, tmp_path):
192+
import json
193+
wiki_root = str(tmp_path)
194+
sources = tmp_path / "sources"
195+
sources.mkdir()
196+
pages = [{"page": 1, "content": "Only page."}]
197+
(sources / "paper.json").write_text(json.dumps(pages), encoding="utf-8")
198+
result = get_page_content("paper", "99", wiki_root)
199+
assert "no content" in result.lower()
200+
201+
def test_includes_images_info(self, tmp_path):
202+
import json
203+
wiki_root = str(tmp_path)
204+
sources = tmp_path / "sources"
205+
sources.mkdir()
206+
pages = [{"page": 1, "content": "Text.", "images": [{"path": "images/p/img.png", "width": 100, "height": 80}]}]
207+
(sources / "doc.json").write_text(json.dumps(pages), encoding="utf-8")
208+
result = get_page_content("doc", "1", wiki_root)
209+
assert "img.png" in result
210+
211+
def test_path_escape_denied(self, tmp_path):
212+
wiki_root = str(tmp_path)
213+
(tmp_path / "sources").mkdir()
214+
result = get_page_content("../../etc/passwd", "1", wiki_root)
215+
assert "denied" in result.lower() or "not found" in result.lower()

0 commit comments

Comments
 (0)