Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
from __future__ import annotations

import pytest

from forecasting_tools.agents_and_tools.source_archive.fetchers.base import FetchError
from forecasting_tools.agents_and_tools.source_archive.models import CaptureResult


class FakeFetcher:
"""Returns canned CaptureResults by URL; raises FetchError for missing ones."""

name = "fake"

def __init__(self) -> None:
self.responses: dict[str, CaptureResult] = {}
self.calls: list[str] = []

def add(
self,
url: str,
*,
html: str | None = None,
markdown: str | None = None,
status_code: int = 200,
screenshot: bytes | None = b"\x89PNG fake",
) -> None:
body = (
html
if html is not None
else "<html><body>" + "content " * 80 + "</body></html>"
)
self.responses[url] = CaptureResult(
url=url,
final_url=url,
status_code=status_code,
html=body,
markdown=markdown if markdown is not None else "content " * 80,
screenshot=screenshot,
screenshot_content_type="image/png",
fetcher=self.name,
)

def fetch(self, url: str) -> CaptureResult:
self.calls.append(url)
if url not in self.responses:
raise FetchError(f"no canned response for {url}")
return self.responses[url]


@pytest.fixture
def make_fetcher():
"""Factory so a test can spin up one or several independent fake fetchers."""

def _factory(name: str = "fake") -> FakeFetcher:
f = FakeFetcher()
f.name = name
return f

return _factory
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
from __future__ import annotations

from forecasting_tools.agents_and_tools.source_archive.ingest.metaculus_comments import (
MetaculusCommentHarvester,
)


def _leaderboard():
return {
"leaderboard_entries": [
{"user": {"id": 1, "username": "botA", "is_bot": True}},
{"user": {"id": 2, "username": "human", "is_bot": False}},
{"user": {"id": 3, "username": "botB", "is_bot": True}},
]
}


def test_enumerate_bots_filters_non_bots():
def fetch(path, params):
assert path == "/leaderboards/project/123/"
assert params["with_entries"] == "true"
return _leaderboard()

h = MetaculusCommentHarvester(fetch_json=fetch)
bots = h.enumerate_bots(123)
assert [b["id"] for b in bots] == [1, 3]


def test_harvest_author_builds_records_with_provenance():
def fetch(path, params):
assert path == "/comments/"
if params["offset"] == 0:
return {
"results": [{"id": 10, "on_post": 555, "text": "src https://a.test/x"}]
}
return {"results": []}

h = MetaculusCommentHarvester(fetch_json=fetch)
records = h.harvest_author(1, run_id="r1", bot="botA")
assert len(records) == 1
rec = records[0]
assert rec.url == "https://a.test/x"
assert rec.bot == "botA"
assert rec.run_id == "r1"
assert rec.question_id == "555"
assert rec.question_url == "https://www.metaculus.com/questions/555/"
assert rec.trace == "comment:10"
assert rec.origin == "metaculus_comment"


def test_iter_comments_paginates_until_short_page():
calls = []

def fetch(path, params):
calls.append(params["offset"])
if params["offset"] == 0:
return {"results": [{"id": i, "text": ""} for i in range(100)]}
return {"results": [{"id": 999, "text": ""}]} # short page -> stop

h = MetaculusCommentHarvester(fetch_json=fetch)
comments = list(h.iter_comments(1))
assert len(comments) == 101
assert calls == [0, 100]


def test_harvest_project_aggregates_bots():
def fetch(path, params):
if path.startswith("/leaderboards/project/"):
return _leaderboard()
# one URL per bot, single page each
if params["offset"] == 0:
author = params["author"]
return {
"results": [
{"id": author, "on_post": 1, "text": f"https://bot{author}.test"}
]
}
return {"results": []}

h = MetaculusCommentHarvester(fetch_json=fetch)
records = h.harvest_project(123)
assert {r.url for r in records} == {"https://bot1.test", "https://bot3.test"}
assert {r.bot for r in records} == {"botA", "botB"}
assert all(r.run_id == "metaculus-comments-123" for r in records)


def test_custom_base_url_drives_web_base():
h = MetaculusCommentHarvester(
base_url="https://example.org/api", fetch_json=lambda p, q: {"results": []}
)
assert h.web_base == "https://example.org"
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from __future__ import annotations

from datetime import datetime, timedelta, timezone

from forecasting_tools.agents_and_tools.source_archive.config import ArchiveConfig
from forecasting_tools.agents_and_tools.source_archive.content_store import ContentStore
from forecasting_tools.agents_and_tools.source_archive.models import (
CaptureResult,
url_hash,
)
from forecasting_tools.agents_and_tools.source_archive.storage import LocalBlobStore


def _store(tmp_path, **cfg) -> ContentStore:
return ContentStore(LocalBlobStore(tmp_path), ArchiveConfig(s3_prefix="t", **cfg))


def _result(url: str, html: str) -> CaptureResult:
return CaptureResult(
url=url,
final_url=url,
status_code=200,
html=html,
markdown="md " * 50,
screenshot=b"img",
screenshot_content_type="image/png",
fetcher="fake",
)


def test_store_writes_blobs_and_index(tmp_path):
store = _store(tmp_path)
res = store.store(_result("https://a.test", "<p>one</p>"))
assert res.created is True
cap = res.capture
assert store.blobs.exists(cap.html_key)
assert store.blobs.exists(cap.markdown_key)
assert store.blobs.exists(cap.screenshot_key)


def test_lookup_within_ttl_is_cache_hit(tmp_path):
store = _store(tmp_path, ttl_days=14)
store.store(_result("https://a.test", "<p>one</p>"))
assert store.lookup("https://a.test") is not None


def test_lookup_after_ttl_expires_returns_none(tmp_path):
store = _store(tmp_path, ttl_days=14)
store.store(_result("https://a.test", "<p>one</p>"))

uh = url_hash("https://a.test")
index = store._read_index(uh)
old = (datetime.now(timezone.utc) - timedelta(days=30)).isoformat()
for cap in index["captures"].values():
cap["last_seen"] = old
store._write_index(uh, index)

assert store.lookup("https://a.test") is None


def test_identical_content_is_deduped(tmp_path):
store = _store(tmp_path)
first = store.store(_result("https://a.test", "<p>same</p>"))
second = store.store(_result("https://a.test", "<p>same</p>"))
assert first.created is True
assert second.created is False
assert first.capture.content_hash == second.capture.content_hash


def test_changed_content_creates_new_capture(tmp_path):
store = _store(tmp_path)
first = store.store(_result("https://a.test", "<p>v1</p>"))
second = store.store(_result("https://a.test", "<p>v2 changed</p>"))
assert second.created is True
assert first.capture.content_hash != second.capture.content_hash
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
from __future__ import annotations

from forecasting_tools.agents_and_tools.source_archive import manifest
from forecasting_tools.agents_and_tools.source_archive.config import ArchiveConfig
from forecasting_tools.agents_and_tools.source_archive.content_store import ContentStore
from forecasting_tools.agents_and_tools.source_archive.models import CitationRecord
from forecasting_tools.agents_and_tools.source_archive.pipeline import CapturePipeline
from forecasting_tools.agents_and_tools.source_archive.storage import LocalBlobStore


def _pipeline(tmp_path, fetcher) -> CapturePipeline:
store = ContentStore(
LocalBlobStore(tmp_path), ArchiveConfig(s3_prefix="t", ttl_days=14)
)
return CapturePipeline(fetcher, store)


def test_manifest_roundtrip_and_unique_urls():
records = [
CitationRecord(url="https://a.test", run_id="r1", bot="b", tool_name="search"),
CitationRecord(url="https://a.test", run_id="r1", bot="b", tool_name="fetch"),
CitationRecord(url="https://b.test", run_id="r1", bot="b"),
]
back = manifest.loads(manifest.dumps(records))
assert [r.url for r in back] == [r.url for r in records]
assert list(manifest.unique_urls(back)) == ["https://a.test", "https://b.test"]


def test_manifest_blob_roundtrip(tmp_path):
store = LocalBlobStore(tmp_path)
cfg = ArchiveConfig(s3_prefix="t")
records = [CitationRecord(url="https://a.test", run_id="r1")]
manifest.write_blob(store, "r1", records, cfg)
assert store.exists("t/manifests/r1.jsonl")
assert manifest.read_blob(store, "r1", cfg)[0].url == "https://a.test"


def test_pipeline_stores_then_cache_hits(tmp_path, make_fetcher):
fetcher = make_fetcher()
fetcher.add("https://a.test")
pipeline = _pipeline(tmp_path, fetcher)

first = pipeline.run(["https://a.test"])
assert first.count("stored") == 1
assert fetcher.calls == ["https://a.test"]

second = pipeline.run(["https://a.test"])
assert second.count("cache_hit") == 1
assert fetcher.calls == ["https://a.test"] # not refetched


def test_pipeline_quality_failed_not_stored(tmp_path, make_fetcher):
fetcher = make_fetcher()
fetcher.add("https://bad.test", status_code=404)
pipeline = _pipeline(tmp_path, fetcher)

summary = pipeline.run(["https://bad.test"])
assert summary.count("quality_failed") == 1
assert summary.captures == {}


def test_pipeline_error_when_no_backend_succeeds(tmp_path, make_fetcher):
fetcher = make_fetcher() # no canned responses -> FetchError
pipeline = _pipeline(tmp_path, fetcher)
summary = pipeline.run(["https://missing.test"])
assert summary.count("error") == 1


def test_pipeline_run_manifest_dedups_urls(tmp_path, make_fetcher):
fetcher = make_fetcher()
fetcher.add("https://a.test")
pipeline = _pipeline(tmp_path, fetcher)
records = [
CitationRecord(url="https://a.test", tool_name="search"),
CitationRecord(url="https://a.test", tool_name="fetch"),
]
summary = pipeline.run_manifest(records)
assert len(summary.outcomes) == 1
assert fetcher.calls == ["https://a.test"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from __future__ import annotations

from forecasting_tools.agents_and_tools.source_archive.fetchers.tiered import (
TieredFetcher,
)
from forecasting_tools.agents_and_tools.source_archive.models import CaptureResult
from forecasting_tools.agents_and_tools.source_archive.quality import evaluate


def _cap(**kw) -> CaptureResult:
base = dict(url="u", final_url="u", status_code=200, html=None, markdown="x " * 200)
base.update(kw)
return CaptureResult(**base)


def test_quality_passes_real_page():
assert evaluate(_cap()).passed


def test_quality_fails_404():
assert not evaluate(_cap(status_code=404)).passed


def test_quality_fails_thin_content():
assert not evaluate(_cap(markdown="short")).passed


def test_quality_fails_block_page():
v = evaluate(_cap(markdown="Attention Required! | Cloudflare " * 20))
assert not v.passed
assert "block_signature" in v.reason


def test_tiered_falls_back_to_secondary_on_quality_fail(make_fetcher):
primary = make_fetcher("primary")
primary.add("https://blocked.test", markdown="please enable javascript " * 20)
secondary = make_fetcher("secondary")
secondary.add("https://blocked.test")

result = TieredFetcher(primary, secondary).fetch("https://blocked.test")
assert result.fetcher == "secondary"
assert result.metadata["quality_passed"] is True


def test_tiered_falls_back_on_fetch_error(make_fetcher):
primary = make_fetcher("primary") # no canned response -> FetchError
secondary = make_fetcher("secondary")
secondary.add("https://x.test")

result = TieredFetcher(primary, secondary).fetch("https://x.test")
assert result.fetcher == "secondary"


def test_tiered_returns_failed_capture_when_all_fail(make_fetcher):
primary = make_fetcher("primary")
primary.add("https://x.test", status_code=404)
secondary = make_fetcher("secondary")
secondary.add("https://x.test", status_code=500)

result = TieredFetcher(primary, secondary).fetch("https://x.test")
assert result.metadata["quality_passed"] is False
Loading
Loading