Skip to content

OpenTelemetry: queue task metrics#759

Open
dahlia wants to merge 8 commits intofedify-dev:mainfrom
dahlia:otel/queue-metrics
Open

OpenTelemetry: queue task metrics#759
dahlia wants to merge 8 commits intofedify-dev:mainfrom
dahlia:otel/queue-metrics

Conversation

@dahlia
Copy link
Copy Markdown
Member

@dahlia dahlia commented May 10, 2026

This adds OpenTelemetry metrics for Fedify queue task processing: inbox, outbox, and fanout. MessageQueue.getDepth() (added in #748) reports backend backlog depth; this PR adds the worker side, so operators can see how fast Fedify is draining the queue, how often workers fail, and how many tasks are in flight per process.

Closes #740. Part of #316.

Changes

  • Added six metric instruments in @fedify/fedify:

    • fedify.queue.task.enqueued (counter, {task})
    • fedify.queue.task.started (counter, {task})
    • fedify.queue.task.completed (counter, {task})
    • fedify.queue.task.failed (counter, {task})
    • fedify.queue.task.duration (histogram, ms)
    • fedify.queue.task.in_flight (up/down counter, {task})
  • Common attributes: fedify.queue.role (inbox, outbox, or fanout), fedify.queue.backend (queue.constructor.name verbatim, omitted when the constructor is the plain Object), fedify.queue.native_retrial, and activitypub.activity.type whenever Fedify knows the activity type. The enqueue counter additionally carries fedify.queue.task.attempt (zero-based) so retry re-enqueues are distinguishable from initial enqueues. The completion-side instruments carry fedify.queue.task.result (completed, failed, or aborted).

  • The in-flight up/down counter intentionally omits per-message attributes (activity type, attempt, result) so that increment and decrement operations always pair up cleanly per attribute series. It is process-local; cross-process aggregation is left to the metrics backend.

  • Aborts are detected via error?.name === "AbortError", which covers both DOMException aborts and aborts that arrive without a DOMException wrapper. Aborted tasks record result=aborted and do not increment fedify.queue.task.failed.

  • Instrumented enqueue boundaries: the inbound path through routeActivity() in packages/fedify/src/federation/inbox.ts, the outbound path through sendActivityInternal() and the queued forwardActivityInternal() in packages/fedify/src/federation/middleware.ts, and the fanout enqueue in Context.sendActivity(). Retry re-enqueues in #listenOutboxMessage() and #listenInboxMessage() record the metric only when the queue is configured.

  • Instrumented worker boundary: each branch of processQueuedTask() is wrapped with started counter, in-flight ±1 around the task body, and completed/failed/aborted plus duration on outcome. #listenInboxMessage() exposes the parsed activity type via an onActivityType callback so the inbox completion measurements can carry it.

  • Tests in packages/fedify/src/federation/middleware.test.ts cover the success, native-retrial failure, internally-handled retry, abort, fanout, and forwardActivity enqueue paths. packages/fedify/src/federation/handler.test.ts adds a test for the inbox enqueue metric through handleInbox().

  • Updated docs/manual/opentelemetry.md with the new instruments, attributes, semantic-attribute table entries, and a paragraph contrasting these process-side metrics with the backend-side MessageQueue.getDepth(). Cross-referenced from docs/manual/mq.md.

  • Updated CHANGES.md.

Notes

When a queue does not declare nativeRetrial and Fedify catches an inbox listener or outbox delivery error to schedule a retry, the worker boundary records result=completed rather than failed, since processQueuedTask() returned without throwing. The retry remains observable as a new fedify.queue.task.enqueued measurement with non-zero fedify.queue.task.attempt, and outbox-side delivery failures stay observable through activitypub.delivery.* metrics and the activitypub.delivery.failed span event. Inbox listener errors that the retry policy abandons are visible through error logs and the inbox span's error status, but not through a dedicated metric. This is deliberate: completed/failed describe the worker boundary, not the activity-level outcome.

The issue's wording suggested an observable gauge for in_flight, but an UpDownCounter matches the synchronous start/end lifecycle: increment before processing, decrement in finally. It is also testable with the existing @fedify/fixture TestMeter.

The fedify.queue.backend attribute is the queue class's constructor.name as-is (RedisMessageQueue, InProcessMessageQueue, etc.) rather than a transformed short name, since application-controlled backend implementations should not be renamed by Fedify.

Verification

  • mise check
  • mise test:deno
  • mise test:node

dahlia added 2 commits May 10, 2026 13:45
Issue fedify-dev#740 asked for metrics covering Fedify's enqueue and worker
boundaries for inbox, outbox, and fanout work.  Backlog depth from
MessageQueue.getDepth() answers "how much is waiting"; these metrics
answer "how fast are workers draining it, how often are they
failing, and how many tasks are running right now in this process".

This change adds six instruments to FederationMetrics:

  fedify.queue.task.enqueued    Counter, every Fedify enqueue
                                including retries.
  fedify.queue.task.started     Counter, when a worker begins.
  fedify.queue.task.completed   Counter, when processing returns
                                without throwing.
  fedify.queue.task.failed      Counter, when processing throws a
                                non-abort error.
  fedify.queue.task.duration    Histogram in ms.
  fedify.queue.task.in_flight   UpDownCounter, process-local; the
                                in-flight series intentionally omits
                                per-message attributes so increments
                                and decrements pair up cleanly.

Common attributes are fedify.queue.role, fedify.queue.backend
(queue.constructor.name verbatim, omitted for plain object literals),
fedify.queue.native_retrial, and activitypub.activity.type when
available.  The enqueue counter additionally carries
fedify.queue.task.attempt; the completion-side instruments carry
fedify.queue.task.result (completed, failed, or aborted).  Aborts are
detected via error?.name === "AbortError".

Instrumentation points:

  - Initial inbox enqueue in routeActivity().
  - Initial outbox enqueue in sendActivityInternal() and queued
    forwardActivityInternal().
  - Initial fanout enqueue in Context.sendActivity().
  - Retry re-enqueues in #listenOutboxMessage() and
    #listenInboxMessage(), guarded so the metric only fires when the
    enqueue actually happens.
  - All three branches of processQueuedTask() are wrapped with
    started/in-flight/completed-or-failed-or-aborted/duration.

#listenInboxMessage() now exposes the parsed activity type to its
caller via an onActivityType callback so processQueuedTask can attach
activitypub.activity.type to the inbox-task completion measurements.

When a queue does not declare nativeRetrial and Fedify catches an
inbox listener or outbox delivery error to schedule a retry, the
worker boundary records result=completed; the retry is observable as
a new fedify.queue.task.enqueued measurement with non-zero
fedify.queue.task.attempt, and outbox-side delivery failures remain
observable through activitypub.delivery.* and the
activitypub.delivery.failed span event.  The OpenTelemetry manual
page now documents this contrast explicitly.

Tests cover success, native-retrial failure, internally-handled retry,
abort, and forwardActivity enqueue paths.  CHANGES.md, the
OpenTelemetry manual, and the message queue manual are updated.

fedify-dev#316
Closes fedify-dev#740

Assisted-by: Claude Code:claude-opus-4-7
Assisted-by: Codex:gpt-5.2-codex
Improve the formatting of the OpenTelemetry queue task metrics list in
CHANGES.md and add a reference to pull request fedify-dev#759.
@dahlia dahlia added this to the Fedify 2.3 milestone May 10, 2026
@dahlia dahlia self-assigned this May 10, 2026
@dahlia dahlia added type/enhancement Improvements to existing features type/feature Requests for new features component/federation Federation object related component/mq Message queue related component/otel OpenTelemetry integration labels May 10, 2026
@dahlia
Copy link
Copy Markdown
Member Author

dahlia commented May 10, 2026

@codex review

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 10, 2026

Review Change Stack

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review
📝 Walkthrough

Walkthrough

Adds OpenTelemetry queue task metrics (enqueued/started/completed/failed/duration/in_flight), integrates recording calls into enqueue and processing paths for inbox/outbox/fanout, adds types and recording methods to FederationMetrics, updates tests to assert emitted metrics, and documents the new metrics.

Changes

Queue Task Observability

Layer / File(s) Summary
Queue Task Types
packages/fedify/src/federation/metrics.ts
Introduces exported types QueueTaskRole (fanout | outbox | inbox) and QueueTaskResult (completed | failed | aborted) and QueueTaskCommonAttributes interface; imports UpDownCounter.
FederationMetrics Instruments
packages/fedify/src/federation/metrics.ts
Extends FederationMetrics with instrument fields for enqueued/started/completed/failed counters, duration histogram, and in‑flight UpDownCounter; constructor creates these instruments.
Recording Methods & Helpers
packages/fedify/src/federation/metrics.ts
Implements recordQueueTaskEnqueued, recordQueueTaskStarted, incrementQueueTaskInFlight, decrementQueueTaskInFlight, recordQueueTaskOutcome; adds helpers to build queue‑task attributes and exports getQueueBackend, recordOutboxEnqueue, and isAbortError.
Enqueue‑Time Metrics
packages/fedify/src/federation/inbox.ts, packages/fedify/src/federation/middleware.ts
Integrates queue‑task enqueue recording at inbox handler, inbox/outbox retry enqueue flows, fanout enqueue, and per‑message/batch outbox enqueue paths; threads meterProvider into routeActivity/sendActivity.
Processing‑Time Metrics
packages/fedify/src/federation/middleware.ts
Records started, in‑flight increment/decrement, duration, and outcome metrics in processQueuedTask for fanout/outbox/inbox; passes activityType callback to inbox listener for metric tagging.
Test Assertions
packages/fedify/src/federation/handler.test.ts, packages/fedify/src/federation/middleware.test.ts
Adds meter provider fixtures and assertions verifying enqueue/started/completed/failed/duration/in_flight metrics across handler, forwardActivity, processQueuedTask, and ContextImpl routing; covers success, failure, retry, and abort scenarios.
Documentation
CHANGES.md, docs/manual/mq.md, docs/manual/opentelemetry.md
Documents new fedify.queue.task.* instruments and attributes in changelog and manual; clarifies queue depth reporting covers only backend backlog and expands semantic attribute definitions.

Sequence Diagram(s)

sequenceDiagram
  participant Client
  participant QueueBackend
  participant Worker
  participant FederationMetrics
  Client->>QueueBackend: enqueue message
  QueueBackend->>Worker: deliver queued message
  Worker->>FederationMetrics: recordQueueTaskStarted(common)
  Worker->>Worker: process task
  Worker->>FederationMetrics: recordQueueTaskOutcome(common, result, duration)
  Worker->>FederationMetrics: decrementQueueTaskInFlight(common)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related issues

  • #740: Implements the queue task processing metrics requested (enqueue, started, completed, failed, duration, in‑flight).

Possibly related PRs

Suggested labels

component/inbox, component/outbox

Suggested reviewers

  • sij411
  • 2chanhaeng
🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 42.86% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title 'OpenTelemetry: queue task metrics' clearly and concisely summarizes the main change: addition of OpenTelemetry metrics for queue task processing.
Description check ✅ Passed The description provides comprehensive coverage of changes including new metric instruments, attributes, implementation details, testing, and documentation updates directly related to the changeset.
Linked Issues check ✅ Passed The PR fully implements the objectives from issue #740: adds all six proposed instruments (enqueued, started, completed, failed, duration, in_flight), includes all specified attributes, instruments enqueue and worker boundaries, and provides comprehensive tests and documentation.
Out of Scope Changes check ✅ Passed All changes are directly aligned with the linked issue objectives: queue task metric instrumentation, documentation updates, and related tests. No unrelated modifications detected.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Tip

💬 Introducing Slack Agent: The best way for teams to turn conversations into code.

Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.

  • Generate code and open pull requests
  • Plan features and break down work
  • Investigate incidents and troubleshoot customer tickets together
  • Automate recurring tasks and respond to alerts with triggers
  • Summarize progress and report instantly

Built for teams:

  • Shared memory across your entire org—no repeating context
  • Per-thread sandboxes to safely plan and execute work
  • Governance built-in—scoped access, auditability, and budget controls

One agent for your entire SDLC. Right inside Slack.

👉 Get started


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@packages/fedify/src/federation/handler.test.ts`:
- Around line 2543-2550: The test currently checks attributes on the enqueued
metric but should also assert the metric type and that the backend attribute is
absent: after calling recorder.getMeasurements("fedify.queue.task.enqueued") and
selecting enqueued[0], add an assertion that its descriptor.type (or the
measurement type property used in tests) equals the expected metric type (e.g.,
"counter" to match other tests like the one around line 2670) and an assertion
that enqueued[0].attributes does not have a "fedify.backend" key (or that
enqueued[0].attributes["fedify.backend"] is undefined) so plain Object metrics
omit backend. Ensure you reference the existing recorder.getMeasurements and
enqueued[0] when adding these assertions.

In `@packages/fedify/src/federation/inbox.ts`:
- Around line 181-184: ContextImpl.routeActivityInternal() calls routeActivity()
without passing the optional federation meter provider, so
getFederationMetrics(...).recordQueueTaskEnqueued ends up using the global
provider; update ContextImpl.routeActivityInternal to forward
this.federation.meterProvider when invoking routeActivity (and any internal
calls that reach the programmatic routeActivity path) so that routeActivity uses
the configured meter provider; specifically ensure the routeActivity(...)
invocation includes the federation.meterProvider argument and that
routeActivity’s signature/prop forwarding (if needed) accepts and passes that
provider through to getFederationMetrics.

In `@packages/fedify/src/federation/metrics.ts`:
- Around line 125-131: recordQueueTaskOutcome currently increments
fedify.queue.task.completed only when outcome === "completed", causing failed
and aborted terminal attempts to be omitted; update recordQueueTaskOutcome (and
the similar block around the second occurrence noted) so that
fedify.queue.task.completed is incremented for every terminal outcome
("completed", "failed", "aborted"), using the same label set (including result
label) you use for fedify.queue.task.result/duration, i.e., move or duplicate
the counter.increment call out of the "completed"-only branch and call it with {
result: outcome, queue: ..., worker: ... } so each terminal attempt is counted.

In `@packages/fedify/src/federation/middleware.test.ts`:
- Around line 4406-4468: Rename or update the test step to remove "enqueue" from
the title or explicitly assert there are no enqueue metrics: after calling
federation.processQueuedTask(...) in the test ("records outbox enqueue and
worker metrics"), add an assertion that
recorder.getMeasurements("fedify.queue.task.enqueued").length is 0 to ensure no
spurious enqueue metric is recorded; alternatively change the step title to
"records outbox worker metrics" if you prefer to not add the assertion. Ensure
the check is adjacent to the existing started/completed assertions and
references the same recorder used in this test.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 7bc87e07-7d0d-4f4e-98c9-8c809dc0bc05

📥 Commits

Reviewing files that changed from the base of the PR and between 7415710 and f68096f.

📒 Files selected for processing (8)
  • CHANGES.md
  • docs/manual/mq.md
  • docs/manual/opentelemetry.md
  • packages/fedify/src/federation/handler.test.ts
  • packages/fedify/src/federation/inbox.ts
  • packages/fedify/src/federation/metrics.ts
  • packages/fedify/src/federation/middleware.test.ts
  • packages/fedify/src/federation/middleware.ts

Comment thread packages/fedify/src/federation/handler.test.ts
Comment thread packages/fedify/src/federation/inbox.ts
Comment thread packages/fedify/src/federation/metrics.ts
Comment thread packages/fedify/src/federation/middleware.test.ts Outdated
@chatgpt-codex-connector
Copy link
Copy Markdown

Codex Review: Didn't find any major issues. Keep them coming!

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

@codecov
Copy link
Copy Markdown

codecov Bot commented May 10, 2026

Codecov Report

❌ Patch coverage is 98.29932% with 5 lines in your changes missing coverage. Please review.
✅ All tests successful. No failed tests found.

Files with missing lines Patch % Lines
packages/fedify/src/federation/middleware.ts 97.53% 0 Missing and 4 partials ⚠️
packages/fedify/src/federation/metrics.ts 99.22% 0 Missing and 1 partial ⚠️
Files with missing lines Coverage Δ
packages/fedify/src/federation/inbox.ts 86.01% <100.00%> (+10.22%) ⬆️
packages/fedify/src/federation/metrics.ts 99.66% <99.22%> (-0.34%) ⬇️
packages/fedify/src/federation/middleware.ts 96.20% <97.53%> (+0.18%) ⬆️

... and 3 files with indirect coverage changes

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a comprehensive set of OpenTelemetry metrics for Fedify's message queue tasks, covering enqueueing, starting, completion, failure, duration, and in-flight tasks across inbox, outbox, and fanout roles. It includes updates to the core federation logic to instrument these metrics, detailed documentation in the manual, and extensive test coverage for various success and failure scenarios. I have no feedback to provide.

dahlia added 2 commits May 10, 2026 14:42
ContextImpl.routeActivityInternal() called the routeActivity() helper
in inbox.ts without passing this.federation.meterProvider, so the
fedify.queue.task.enqueued counter recorded from the public
Context.routeActivity() API path was emitted on the global
MeterProvider instead of the configured one.  The fetch-driven
inbox enqueue path through handleInbox() forwarded the provider
correctly, so this only affected callers using ctx.routeActivity()
directly.

Forward this.federation.meterProvider into the routeActivity() call
and add a regression test that exercises the path with a configured
test meter provider.

fedify-dev#759 (comment)

Assisted-by: Claude Code:claude-opus-4-7
Assisted-by: Codex:gpt-5.2-codex
Two CodeRabbit nitpicks on the queue task metric tests:

1. handleInbox() inbox-enqueue test (handler.test.ts) now also asserts
   the measurement's instrument type is "counter", and that the
   fedify.queue.backend attribute is undefined when the queue is an
   object literal (whose constructor.name is "Object" and is filtered
   by getQueueBackend()).
2. processQueuedTask() outbox worker step (middleware.test.ts) is
   renamed to "records outbox worker metrics on successful delivery",
   since the body asserts only worker-side started/completed and never
   exercised the enqueue counter.  An assertion that no enqueue
   measurement is emitted on the success path is added so accidental
   double-counting in #listenOutboxMessage() would fail this test.

fedify-dev#759 (comment)
fedify-dev#759 (comment)

Assisted-by: Claude Code:claude-opus-4-7
Assisted-by: Codex:gpt-5.2-codex
@dahlia
Copy link
Copy Markdown
Member Author

dahlia commented May 10, 2026

/gemini review

@dahlia
Copy link
Copy Markdown
Member Author

dahlia commented May 10, 2026

@codex review

@chatgpt-codex-connector
Copy link
Copy Markdown

Codex Review: Didn't find any major issues. Can't wait for the next one!

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a comprehensive suite of OpenTelemetry metrics for Fedify's queue system, covering inbox, outbox, and fanout roles. It adds counters for task lifecycle events (enqueued, started, completed, failed), a histogram for processing duration, and an up/down counter for tracking in-flight tasks. The implementation includes core logic updates, detailed documentation, and extensive unit tests. Feedback suggests improving the duration histogram by defining explicit bucket boundaries for better aggregation and notes a limitation in correlating 'started' metrics with activity types for inbox tasks since parsing occurs after the task begins.

Comment thread packages/fedify/src/federation/metrics.ts
Comment thread packages/fedify/src/federation/middleware.ts
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
packages/fedify/src/federation/middleware.ts (1)

1387-1460: 🧹 Nitpick | 🔵 Trivial | ⚡ Quick win

Consolidate the two per-message enqueue branches in sendActivity.

Lines 1397–1419 (when enqueueMany == null) and lines 1423–1447 (when enqueueMany exists but orderingKey != null) are nearly identical: both do Promise.allSettled over per-message outboxQueue.enqueue calls with the same error aggregation. The sibling implementation in forwardActivityInternal (line 3394) already collapses these cases with outboxQueue.enqueueMany == null || orderingKey != null. Aligning here removes ~25 duplicated lines and one slightly diverging error path.

♻️ Proposed consolidation
-    if (outboxQueue.enqueueMany == null) {
+    if (outboxQueue.enqueueMany == null || orderingKey != null) {
       const promises: PromiseSettledResult<void>[] = await Promise.allSettled(
         messages.map(async (m) => {
           await outboxQueue.enqueue(m.message, { orderingKey: m.orderingKey });
           recordOutboxEnqueued(m.message);
         }),
       );
-      const errors = promises
+      const errors: unknown[] = promises
         .filter((r) => r.status === "rejected")
         .map((r) => (r as PromiseRejectedResult).reason);
       if (errors.length > 0) {
         logger.error(
           "Failed to enqueue activity {activityId} to send later: {errors}",
           { activityId: activity.id!.href, errors },
         );
         if (errors.length > 1) {
           throw new AggregateError(
             errors,
             `Failed to enqueue activity ${activityId} to send later.`,
           );
         }
         throw errors[0];
       }
     } else {
-      // Note: enqueueMany does not support per-message orderingKey,
-      // so we fall back to individual enqueues when orderingKey is specified
-      if (orderingKey != null) {
-        const promises: PromiseSettledResult<void>[] = await Promise.allSettled(
-          messages.map(async (m) => {
-            await outboxQueue.enqueue(m.message, {
-              orderingKey: m.orderingKey,
-            });
-            recordOutboxEnqueued(m.message);
-          }),
-        );
-        const errors = promises
-          .filter((r) => r.status === "rejected")
-          .map((r) => (r as PromiseRejectedResult).reason);
-        if (errors.length > 0) {
-          logger.error(
-            "Failed to enqueue activity {activityId} to send later: {errors}",
-            { activityId: activity.id!.href, errors },
-          );
-          if (errors.length > 1) {
-            throw new AggregateError(
-              errors,
-              `Failed to enqueue activity ${activityId} to send later.`,
-            );
-          }
-          throw errors[0];
-        }
-      } else {
-        try {
-          await outboxQueue.enqueueMany(messages.map((m) => m.message));
-        } catch (error) {
-          logger.error(
-            "Failed to enqueue activity {activityId} to send later: {error}",
-            { activityId: activity.id!.href, error },
-          );
-          throw error;
-        }
-        for (const m of messages) recordOutboxEnqueued(m.message);
-      }
+      try {
+        await outboxQueue.enqueueMany(messages.map((m) => m.message));
+      } catch (error) {
+        logger.error(
+          "Failed to enqueue activity {activityId} to send later: {error}",
+          { activityId: activity.id!.href, error },
+        );
+        throw error;
+      }
+      for (const m of messages) recordOutboxEnqueued(m.message);
     }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/fedify/src/federation/middleware.ts` around lines 1387 - 1460, The
two nearly identical per-message enqueue branches in sendActivity should be
consolidated: change the conditional that currently checks
outboxQueue.enqueueMany == null (and the separate orderingKey != null branch)
into a single branch using if (outboxQueue.enqueueMany == null || orderingKey !=
null) and run the Promise.allSettled over messages calling outboxQueue.enqueue
with orderingKey, invoking recordOutboxEnqueued for each, then aggregate errors
exactly as done today (logger.error, throw AggregateError when >1, else throw
the single error). Keep the else path that calls
outboxQueue.enqueueMany(messages.map(m => m.message)) with its try/catch and
post-success recordOutboxEnqueued loop unchanged; remove the duplicate
Promise.allSettled block previously duplicated in sendActivity.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@packages/fedify/src/federation/middleware.ts`:
- Around line 1387-1396: The duplicate recordOutboxEnqueued closure used in
sendActivity and forwardActivityInternal should be extracted to a single
reusable function; add a new free function (e.g.,
recordOutboxEnqueued(meterProvider, outboxQueue, message: OutboxMessage)) in
metrics.ts (or a method on FederationImpl) that calls
getFederationMetrics(meterProvider).recordQueueTaskEnqueued({ role: "outbox",
queue: outboxQueue, activityType: message.activityType }, message.attempt); then
replace both local closures (the ones named recordOutboxEnqueued in sendActivity
and forwardActivityInternal) with calls to that new function, keeping parameter
names meterProvider, outboxQueue and message to make the swap mechanical and
ensure future attribute additions only require one update site.

---

Outside diff comments:
In `@packages/fedify/src/federation/middleware.ts`:
- Around line 1387-1460: The two nearly identical per-message enqueue branches
in sendActivity should be consolidated: change the conditional that currently
checks outboxQueue.enqueueMany == null (and the separate orderingKey != null
branch) into a single branch using if (outboxQueue.enqueueMany == null ||
orderingKey != null) and run the Promise.allSettled over messages calling
outboxQueue.enqueue with orderingKey, invoking recordOutboxEnqueued for each,
then aggregate errors exactly as done today (logger.error, throw AggregateError
when >1, else throw the single error). Keep the else path that calls
outboxQueue.enqueueMany(messages.map(m => m.message)) with its try/catch and
post-success recordOutboxEnqueued loop unchanged; remove the duplicate
Promise.allSettled block previously duplicated in sendActivity.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 0c8aae44-43ba-4679-8df0-fc189902d315

📥 Commits

Reviewing files that changed from the base of the PR and between f68096f and 643c235.

📒 Files selected for processing (3)
  • packages/fedify/src/federation/handler.test.ts
  • packages/fedify/src/federation/middleware.test.ts
  • packages/fedify/src/federation/middleware.ts

Comment thread packages/fedify/src/federation/middleware.ts Outdated
dahlia added 2 commits May 10, 2026 15:06
The per-message enqueue branch that ran when the queue lacked
enqueueMany() was structurally identical to the one that ran when
enqueueMany() existed but the caller specified an orderingKey: both
fell back to individual enqueue() calls and recorded
recordOutboxEnqueued() for each.  Folding both conditions into a single
fallback (`enqueueMany == null || orderingKey != null`) removes a copy
of the Promise.allSettled error-aggregation block, leaving the
enqueueMany() success path untouched.

No behavior change.

Assisted-by: Claude Code:claude-opus-4-7
Use the same OpenTelemetry HTTP server semantic-conventions buckets
(5 ms to 10 s) that fedify.http.server.request.duration already uses.
Queue task durations span a similar range: in-process inbox listeners
finish in single-digit milliseconds, while outbox delivery to a slow
remote inbox can take several seconds.  Without explicit buckets,
exporters fall back to backend-specific defaults that vary across
Prometheus, OTLP/Tempo, and Datadog and make latency dashboards
inconsistent across deployments.

fedify-dev#759 (comment)

Assisted-by: Claude Code:claude-opus-4-7
Assisted-by: Gemini Code Assist
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (1)
packages/fedify/src/federation/middleware.ts (1)

1387-1396: 🧹 Nitpick | 🔵 Trivial | 💤 Low value

Duplicate of past review comment.

This recordOutboxEnqueued helper and the one in forwardActivityInternal (lines 3358–3367) are structurally identical; consolidating them was already raised. Flagging here only for tracking.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/fedify/src/federation/middleware.ts` around lines 1387 - 1396,
Duplicate helper functions exist: consolidate the two identical
recordOutboxEnqueued implementations into a single shared helper and call it
from both places. Extract the logic that calls
getFederationMetrics(...).recordQueueTaskEnqueued(...) (currently inside
recordOutboxEnqueued and the version inside forwardActivityInternal) into one
exported/top-level function (e.g., recordOutboxEnqueued or
recordQueueTaskEnqueuedHelper), preserve the same parameters (message:
OutboxMessage, outboxQueue, meterProvider context), replace the duplicate blocks
with calls to that shared function, and ensure imports/exports are updated where
needed so both call sites use the single implementation.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Duplicate comments:
In `@packages/fedify/src/federation/middleware.ts`:
- Around line 1387-1396: Duplicate helper functions exist: consolidate the two
identical recordOutboxEnqueued implementations into a single shared helper and
call it from both places. Extract the logic that calls
getFederationMetrics(...).recordQueueTaskEnqueued(...) (currently inside
recordOutboxEnqueued and the version inside forwardActivityInternal) into one
exported/top-level function (e.g., recordOutboxEnqueued or
recordQueueTaskEnqueuedHelper), preserve the same parameters (message:
OutboxMessage, outboxQueue, meterProvider context), replace the duplicate blocks
with calls to that shared function, and ensure imports/exports are updated where
needed so both call sites use the single implementation.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 4f80b376-8728-446b-9b0d-539008648bb6

📥 Commits

Reviewing files that changed from the base of the PR and between 643c235 and f591b5c.

📒 Files selected for processing (1)
  • packages/fedify/src/federation/middleware.ts

@dahlia
Copy link
Copy Markdown
Member Author

dahlia commented May 10, 2026

/gemini review

@dahlia
Copy link
Copy Markdown
Member Author

dahlia commented May 10, 2026

@codex review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces OpenTelemetry metrics for Fedify's queue tasks, enabling monitoring of enqueueing, processing, and in-flight tasks across inbox, outbox, and fanout roles. The changes include new metric instruments in the FederationMetrics class, middleware integration, and updated documentation. Review feedback identifies performance bottlenecks in batch operations where metric attributes are redundantly calculated, recommending that these be pre-calculated outside of loops to reduce overhead.

Comment thread packages/fedify/src/federation/middleware.ts Outdated
Comment thread packages/fedify/src/federation/middleware.ts Outdated
@chatgpt-codex-connector
Copy link
Copy Markdown

Codex Review: Didn't find any major issues. You're on a roll.

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

sendActivityInternal() and forwardActivityInternal() each defined an
identical recordOutboxEnqueued() closure that just forwarded
(activityType, attempt) to FederationMetrics.recordQueueTaskEnqueued()
under role=outbox.  Move the body to a single top-level function in
metrics.ts, so future attribute additions only need one update site.

The shared helper takes the meter provider, the outbox queue, and a
structural { activityType, attempt } subset of OutboxMessage.  Keeping
the parameter as a structural type means metrics.ts does not need to
import from queue.ts.

fedify-dev#759 (comment)

Assisted-by: Claude Code:claude-opus-4-7
Assisted-by: CodeRabbit
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@packages/fedify/src/federation/middleware.ts`:
- Around line 529-535: The catch blocks currently set span.setStatus({ code:
SpanStatusCode.ERROR, message: String(e) }) unconditionally even when
isAbortError(e) is true; change each worker branch (fanout, outbox, inbox) so
that span.setStatus is only called when !isAbortError(e) — keep outcome =
isAbortError(e) ? "aborted" : "failed" but wrap the span.setStatus(...) call in
a non-abort guard (e.g., if (!isAbortError(e)) { span.setStatus(...) }) so
aborted tasks do not mark spans as errors.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: e77dd1ae-e1ff-454e-813c-c7363dbee89e

📥 Commits

Reviewing files that changed from the base of the PR and between f6624d9 and de2d3d9.

📒 Files selected for processing (2)
  • packages/fedify/src/federation/metrics.ts
  • packages/fedify/src/federation/middleware.ts

Comment thread packages/fedify/src/federation/middleware.ts
processQueuedTask() distinguishes aborted queue tasks via
fedify.queue.task.result=aborted so that backend shutdown signals do
not inflate the failed counter, but each catch block still set
SpanStatusCode.ERROR on the consumer span unconditionally.  The result
was that trace error-rate views reported every cancelled task as a
failure even though the metric side already excluded them.

Per the OpenTelemetry "Recording errors" guidance, span status MUST
remain UNSET for operations that ended without an actual error, and
should only flip to ERROR when there is one.  Cancellation does not
qualify on its own.

Guard the span.setStatus() calls in all three worker branches (fanout,
outbox, inbox) on isAbortError(e) being false.  The metric outcome
classification is unchanged.  An assertion on the abort-path test
confirms the inbox span's status code stays UNSET.

fedify-dev#759 (comment)

Assisted-by: Claude Code:claude-opus-4-7
Assisted-by: CodeRabbit
@dahlia
Copy link
Copy Markdown
Member Author

dahlia commented May 10, 2026

/gemini review

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces comprehensive OpenTelemetry metrics for Fedify's queue tasks, covering enqueueing, processing starts, completions, failures, durations, and in-flight counts for inbox, outbox, and fanout roles. The changes include updated documentation and extensive new test cases to verify metric accuracy. A review comment identified a potential issue in middleware.ts where enqueue metrics could be incorrectly recorded if a bulk enqueue operation fails; it is recommended to move the metric recording logic inside the success path of the try-catch block to ensure data integrity.

Comment thread packages/fedify/src/federation/middleware.ts
@dahlia
Copy link
Copy Markdown
Member Author

dahlia commented May 10, 2026

@codex review

@chatgpt-codex-connector
Copy link
Copy Markdown

Codex Review: Didn't find any major issues. Bravo.

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

@dahlia dahlia requested review from 2chanhaeng and sij411 May 10, 2026 09:33
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

component/federation Federation object related component/mq Message queue related component/otel OpenTelemetry integration type/enhancement Improvements to existing features type/feature Requests for new features

Development

Successfully merging this pull request may close these issues.

OpenTelemetry metrics for queue task processing

1 participant