Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,35 @@ Expected usage patterns:
- Additional minor intentional deviations may be documented directly in the codebase.
Such intentional deviations should be marked with `REFERENCE PARITY` comments in the code.

## Review team

After every change or milestone, or when explicitly prompted, dispatch several fresh-context review agents set to the
MAXIMUM THINKING EFFORT to review your work:

- A subagent focusing on the FUNCTIONAL CORRECTNESS and ROBUSTNESS of the implementation.
- A subagent focusing on the ARCHITECTURAL CLEANLINESS, DESIGN PRACTICES, and CODE QUALITY.
- Distinct tools -- Codex, Antigravity (`agy`), Claude (check what's available, exclude yourself) --
focusing on CORRECTNESS only.

It is important that we use all available distinct tools to maximize the diversity of perspectives
and minimize blind spots. When all are done, review and consolidate their findings and act accordingly.
If behavioral defects are found, ensure extensive regression tests are introduced.

Repeat the review/refine loop until the agents return only trivial feedback (or none) for three (sic!) consecutive turns.
Here, "trivial feedback" means stylistic/inconsequential issues such as wording, formatting, trivial parameter
validation, or anything else that does not materially affect the correctness or maintainability of the codebase.
Iteration until no feedback has been attempted in the past but it is not practical because in the absence of significant
issues the review agents tend to degrade to nitpicking.
Hence, we stop iteration earlier, as soon as the feedback ceases to contain significant findings.

The requirement of multiple consecutive reviews with no significant findings is intended to improve the coverage.
We have seen in the past how a single review turn would come up blank while the next round (with zero code changes in
between) would dig up a critical defect. Hence, we repeat turns generously across distinct agents for maximum assurance.

Review agents in maximum thinking mode may go silent for a long time; set a generous timeout (at least 1 hour).
Some agents expect input from stdin when launched headless and may get hung if no input is given;
in those cases consider redirecting from `/dev/null` or something like that; read the docs to figure out usage.

## Documentation

The documentation must be concise and to the point, with a strong focus on "how to use" rather than "how it works".
Expand Down
2 changes: 1 addition & 1 deletion src/pycyphal2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ async def main():
from ._transport import Transport as Transport
from ._transport import TransportArrival as TransportArrival

__version__ = "2.0.0.dev3"
__version__ = "2.0.0.dev4"

# pdoc needs __all__ to display re-exported members.
__all__ = [
Expand Down
107 changes: 89 additions & 18 deletions src/pycyphal2/_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
from collections import OrderedDict
from collections.abc import Coroutine
import logging
import math
import os
Expand All @@ -28,7 +29,7 @@
deserialize_header,
)
from ._transport import SubjectWriter, Transport, TransportArrival
from ._api import Topic, Node, Publisher, Subscriber, Breadcrumb, Closable, Instant, Priority, SendError
from ._api import Topic, Node, Publisher, Subscriber, Breadcrumb, Closable, ClosedError, Instant, Priority, SendError
from ._api import SUBJECT_ID_PINNED_MAX

if TYPE_CHECKING:
Expand Down Expand Up @@ -58,6 +59,22 @@
U64_MASK = (1 << 64) - 1


def ack_is_last_attempt(current_ack_deadline_ns: int, current_ack_timeout: float, total_deadline_ns: int) -> bool:
"""True if doubling the ACK timeout would overrun the total deadline, so this is the last retry."""
next_ack_timeout_ns = round(current_ack_timeout * 2 * 1e9)
remaining_budget_ns = total_deadline_ns - current_ack_deadline_ns
return remaining_budget_ns < next_ack_timeout_ns


def ack_window(deadline_ns: int, ack_timeout: float) -> tuple[int, bool] | None:
"""Next reliable-delivery ACK window: (ack_deadline_ns, is_last_attempt), or None if past the deadline."""
now_ns = Instant.now().ns
if now_ns >= deadline_ns:
return None
ack_deadline_ns = min(deadline_ns, now_ns + round(ack_timeout * 1e9))
return ack_deadline_ns, ack_is_last_attempt(ack_deadline_ns, ack_timeout, deadline_ns)


class GossipScope(Enum):
UNICAST = auto()
BROADCAST = auto()
Expand Down Expand Up @@ -115,6 +132,24 @@ def _name_is_homeful(name: str) -> bool:
return name == "~" or name.startswith("~/")


def _is_valid_wire_name(name: str) -> bool:
"""True if `name` is a well-formed *resolved* wire topic name, as required of names received in gossip:
nonempty, length-bounded, printable ASCII (33-126), already normalized (no leading/trailing/duplicate
'/'), verbatim (no '*'/'>' pattern tokens), not homeful ('~'/'~/...'), and pin-free (no '#<id>' suffix).
The last two are stripped/expanded by resolve_name before a name reaches the wire, so their presence
means the gossip is unresolved/non-canonical and must not create a local topic."""
return (
bool(name)
and len(name) <= TOPIC_NAME_MAX
and "*" not in name
and ">" not in name
and not _name_is_homeful(name)
and _name_consume_pin_suffix(name)[1] is None
and all(33 <= ord(ch) <= 126 for ch in name)
and _name_normalize(name) == name
)


def resolve_name(
name: str, home: str, namespace: str, remaps: dict[str, str] | None = None
) -> tuple[str, int | None, bool]:
Expand Down Expand Up @@ -345,10 +380,8 @@ class PublishTracker:
"""Tracks a pending reliable publication awaiting ACKs."""

tag: int
deadline_ns: int
ack_event: asyncio.Event
acknowledged: bool = False
data: bytes | None = None
ack_timeout: float = ACK_BASELINE_DEFAULT_TIMEOUT
compromised: bool = False
remaining: set[int] = field(default_factory=set)
Expand Down Expand Up @@ -507,7 +540,6 @@ def __init__(self, transport: Transport, *, home: str, namespace: str) -> None:
self._remaps: dict[str, str] = {}
self._closed = False
self.loop = asyncio.get_running_loop()
self._now_mono = time.monotonic()
self._monitor_callbacks: dict[int, Callable[[Topic], None]] = {}
self._next_monitor_callback_id = 0

Expand Down Expand Up @@ -573,7 +605,31 @@ def namespace(self) -> str:
def transport(self) -> Transport:
return self._transport

def _raise_if_closed(self) -> None:
if self._closed:
raise ClosedError(f"Node '{self._home}' is closed")

def _spawn_detached(self, coro: Coroutine[Any, Any, None], what: str) -> None:
"""Fire-and-forget a short-lived send: skip when closed, and never let its exception go unobserved."""
if self._closed:
coro.close()
return

def _done(task: asyncio.Task[None]) -> None:
if task.cancelled():
return
ex = task.exception()
if ex is None:
return
if isinstance(ex, (SendError, OSError)):
_logger.debug("%s send failed: %s", what, ex)
else:
_logger.error("%s task crashed: %s", what, ex, exc_info=ex)

self.loop.create_task(coro).add_done_callback(_done)

def remap(self, spec: str | dict[str, str]) -> None:
self._raise_if_closed()
if isinstance(spec, str):
spec = dict(x.split("=", 1) for x in spec.split() if "=" in x)
assert isinstance(spec, dict)
Expand All @@ -584,6 +640,7 @@ def remap(self, spec: str | dict[str, str]) -> None:
def advertise(self, name: str) -> Publisher:
from ._publisher import PublisherImpl

self._raise_if_closed()
resolved, pin, verbatim = resolve_name(name, self._home, self._namespace, self._remaps)
if not verbatim:
raise ValueError("Cannot advertise on a pattern name")
Expand All @@ -602,6 +659,7 @@ def advertise(self, name: str) -> Publisher:
def subscribe(self, name: str, *, reordering_window: float | None = None) -> Subscriber:
from ._subscriber import SubscriberImpl

self._raise_if_closed()
resolved, pin, verbatim = resolve_name(name, self._home, self._namespace, self._remaps)
if pin is not None and not verbatim:
raise ValueError("Pattern names cannot be pinned")
Expand Down Expand Up @@ -637,6 +695,7 @@ def subscribe(self, name: str, *, reordering_window: float | None = None) -> Sub
return subscriber

def monitor(self, callback: Callable[[Topic], None]) -> Closable:
self._raise_if_closed()
callback_id = self._next_monitor_callback_id
self._next_monitor_callback_id += 1
self._monitor_callbacks[callback_id] = callback
Expand All @@ -653,6 +712,7 @@ def _notify_monitors(self, topic: Topic) -> None:
_logger.exception("monitor() callback failed for %s", topic)

async def scout(self, pattern: str) -> None:
self._raise_if_closed()
resolved, pin, _ = resolve_name(pattern, self._home, self._namespace, self._remaps)
if pin is not None:
raise ValueError("Cannot scout a pinned name/pattern")
Expand Down Expand Up @@ -800,12 +860,10 @@ def publish_tracker_release(topic: TopicImpl, tracker: PublishTracker) -> None:
tracker.remaining.clear()

@staticmethod
def prepare_publish_tracker(topic: TopicImpl, tag: int, deadline_ns: int, data: bytes) -> PublishTracker:
def prepare_publish_tracker(topic: TopicImpl, tag: int) -> PublishTracker:
tracker = PublishTracker(
tag=tag,
deadline_ns=deadline_ns,
ack_event=asyncio.Event(),
data=data,
)
tracker.ack_timeout = ACK_BASELINE_DEFAULT_TIMEOUT
for assoc in sorted(topic.associations.values(), key=lambda x: x.remote_id):
Expand Down Expand Up @@ -1036,14 +1094,6 @@ async def do_send() -> None:

root.scout_task = self.loop.create_task(do_send())

def send_scout(self, pattern: str) -> None:
"""Send a scout message to discover topics matching a pattern."""

async def do_send() -> None:
await self._send_scout_once(pattern)

self.loop.create_task(do_send())

# -- Message Dispatch --

def on_subject_arrival(self, subject_id: int, arrival: TransportArrival) -> None:
Expand All @@ -1055,6 +1105,8 @@ def on_unicast_arrival(self, arrival: TransportArrival) -> None:
self.dispatch_arrival(arrival, subject_id=None, unicast=True)

def dispatch_arrival(self, arrival: TransportArrival, *, subject_id: int | None, unicast: bool) -> None:
if self._closed:
return # Drop late arrivals after close instead of mutating state / spawning sends.
msg = arrival.message
if len(msg) < HEADER_SIZE:
_logger.debug("Drop short msg len=%d", len(msg))
Expand Down Expand Up @@ -1207,7 +1259,7 @@ async def do_send() -> None:
except (SendError, OSError) as e:
_logger.debug("ACK send failed: %s", e)

self.loop.create_task(do_send())
self._spawn_detached(do_send(), "ACK")

def on_msg_ack(self, arrival: TransportArrival, hdr: MsgAckHeader | MsgNackHeader) -> None:
topic = self.topics_by_hash.get(hdr.topic_hash)
Expand Down Expand Up @@ -1295,7 +1347,7 @@ async def do_send() -> None:
except (SendError, OSError) as e:
_logger.debug("RSP ACK send failed: %s", e)

self.loop.create_task(do_send())
self._spawn_detached(do_send(), "RSP ACK")

def on_gossip(
self,
Expand All @@ -1306,6 +1358,8 @@ def on_gossip(
) -> None:
name = ""
if hdr.name_len > 0:
# Best-effort decode for diagnostics/monitoring; an invalid name cannot create a topic because
# topic_subscribe_if_matching validates the character set before creating one.
name = payload[: hdr.name_len].decode("utf-8", errors="replace")

topic = self.topics_by_hash.get(hdr.topic_hash)
Expand Down Expand Up @@ -1374,6 +1428,14 @@ def topic_subscribe_if_matching(
now: float,
) -> TopicImpl | None:
"""Create an implicit topic if any pattern subscriber matches the name."""
# REFERENCE PARITY: the reference does not (yet) validate gossip-name characters here -- it trusts
# the hash. We additionally reject names that are not valid resolved wire names (non-normalized,
# non-verbatim by this implementation's rule, homeful, or pinned) so untrusted wire input cannot
# create a local topic with such a name. Consistent with the documented whitespace-strip deviation;
# the reference may adopt the same validation later.
if not _is_valid_wire_name(name):
_logger.debug("Gossip drop invalid wire name hash=%016x", topic_hash)
return None
# Validate that the hash matches the name to prevent corrupt gossip from creating inconsistencies.
if rapidhash(name) != topic_hash:
_logger.debug("Gossip hash mismatch for '%s': got %016x, expected %016x", name, topic_hash, rapidhash(name))
Expand All @@ -1398,12 +1460,15 @@ def topic_subscribe_if_matching(
def on_scout(self, arrival: TransportArrival, hdr: ScoutHeader, payload: bytes) -> None:
if hdr.pattern_len == 0 or hdr.pattern_len > TOPIC_NAME_MAX or len(payload) < hdr.pattern_len:
return
# Best-effort decode; an invalid pattern simply matches no local topic names.
pattern = payload[: hdr.pattern_len].decode("utf-8", errors="replace")
_logger.debug("Scout received pattern='%s' from %016x", pattern, arrival.remote_id)
for topic in list(self.topics_by_name.values()):
subs = match_pattern(pattern, topic.name)
if subs is not None:
self.loop.create_task(self.send_gossip_unicast(topic, arrival.remote_id, arrival.priority))
self._spawn_detached(
self.send_gossip_unicast(topic, arrival.remote_id, arrival.priority), "gossip unicast"
)

# -- Implicit Topic GC --

Expand Down Expand Up @@ -1474,6 +1539,12 @@ def close(self) -> None:
return
self._closed = True
_logger.info("Node closing home='%s'", self._home)
# Unblock anything awaiting on a subscriber (`async for`): closing each enqueues StopAsyncIteration,
# otherwise a default (no-liveness-timeout) subscriber would wait on its queue forever. (Reliable
# publishes / response streams are deadline-bounded and resolve on their own.)
for root in list(self.sub_roots_verbatim.values()) + list(self.sub_roots_pattern.values()):
for sub in list(root.subscribers):
sub.close()
self._gc_task.cancel()
for root in list(self.sub_roots_pattern.values()):
if root.scout_task is not None:
Expand Down
28 changes: 7 additions & 21 deletions src/pycyphal2/_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from ._api import DeliveryError, Instant, LivenessError, Priority, SendError
from ._api import Publisher, Topic, ResponseStream, Response
from ._header import MsgBeHeader, MsgRelHeader, RspBeHeader, RspRelHeader
from ._node import ACK_BASELINE_DEFAULT_TIMEOUT, NodeImpl, PublishTracker, SESSION_LIFETIME, TopicImpl
from ._node import ACK_BASELINE_DEFAULT_TIMEOUT, NodeImpl, PublishTracker, SESSION_LIFETIME, TopicImpl, ack_window
from ._transport import TransportArrival

_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -124,7 +124,7 @@ async def request(
)
self._topic.request_futures[tag] = stream

tracker = self._prepare_reliable_publish_tracker(tag, delivery_deadline.ns, payload)
tracker = self._prepare_reliable_publish_tracker(tag)
try:
initial_window = await self._reliable_publish_start(delivery_deadline, tag, payload, tracker)
except asyncio.CancelledError:
Expand Down Expand Up @@ -169,12 +169,6 @@ async def _request_publish(
finally:
self._release_reliable_publish_tracker(tag, tracker)

@staticmethod
def _ack_is_last_attempt(current_ack_deadline_ns: int, current_ack_timeout: float, total_deadline_ns: int) -> bool:
next_ack_timeout_ns = round(current_ack_timeout * 2 * 1e9)
remaining_budget_ns = total_deadline_ns - current_ack_deadline_ns
return remaining_budget_ns < next_ack_timeout_ns

@staticmethod
def _ack_window_is_compromised(deadline_ns: int, current_ack_timeout: float) -> bool:
return Instant.now().ns >= (deadline_ns - round(current_ack_timeout * 1e9))
Expand All @@ -189,16 +183,8 @@ def _serialize_message(self, tag: int, payload: bytes, *, reliable: bool) -> byt
)
return hdr.serialize() + payload

@staticmethod
def _reliable_publish_window(deadline_ns: int, ack_timeout: float) -> tuple[int, bool] | None:
now_ns = Instant.now().ns
if now_ns >= deadline_ns:
return None
ack_deadline_ns = min(deadline_ns, now_ns + round(ack_timeout * 1e9))
return ack_deadline_ns, PublisherImpl._ack_is_last_attempt(ack_deadline_ns, ack_timeout, deadline_ns)

def _prepare_reliable_publish_tracker(self, tag: int, deadline_ns: int, payload: bytes) -> PublishTracker:
tracker = self._node.prepare_publish_tracker(self._topic, tag, deadline_ns, payload)
def _prepare_reliable_publish_tracker(self, tag: int) -> PublishTracker:
tracker = self._node.prepare_publish_tracker(self._topic, tag)
tracker.ack_timeout = self.ack_timeout
self._topic.publish_futures[tag] = tracker
return tracker
Expand Down Expand Up @@ -231,7 +217,7 @@ async def _reliable_publish_start(
payload: bytes,
tracker: PublishTracker,
) -> tuple[int, bool]:
initial_window = self._reliable_publish_window(deadline.ns, tracker.ack_timeout)
initial_window = ack_window(deadline.ns, tracker.ack_timeout)
if initial_window is None:
raise DeliveryError("Reliable publish not acknowledged before deadline")
ack_deadline_ns, _ = initial_window
Expand Down Expand Up @@ -277,7 +263,7 @@ async def _reliable_publish_continue(
if last_attempt:
break
tracker.ack_timeout *= 2
next_window = self._reliable_publish_window(deadline.ns, tracker.ack_timeout)
next_window = ack_window(deadline.ns, tracker.ack_timeout)
if next_window is None:
break
ack_deadline_ns, last_attempt = next_window
Expand All @@ -292,7 +278,7 @@ async def _reliable_publish_continue(
raise DeliveryError("Reliable publish not acknowledged before deadline")

async def _reliable_publish(self, deadline: Instant, tag: int, payload: bytes) -> None:
tracker = self._prepare_reliable_publish_tracker(tag, deadline.ns, payload)
tracker = self._prepare_reliable_publish_tracker(tag)
try:
initial_window = await self._reliable_publish_start(deadline, tag, payload, tracker)
await self._reliable_publish_continue(deadline, tag, payload, tracker, initial_window)
Expand Down
Loading