OpenTelemetry: queue task metrics#759
Conversation
Issue fedify-dev#740 asked for metrics covering Fedify's enqueue and worker boundaries for inbox, outbox, and fanout work. Backlog depth from MessageQueue.getDepth() answers "how much is waiting"; these metrics answer "how fast are workers draining it, how often are they failing, and how many tasks are running right now in this process". This change adds six instruments to FederationMetrics: fedify.queue.task.enqueued Counter, every Fedify enqueue including retries. fedify.queue.task.started Counter, when a worker begins. fedify.queue.task.completed Counter, when processing returns without throwing. fedify.queue.task.failed Counter, when processing throws a non-abort error. fedify.queue.task.duration Histogram in ms. fedify.queue.task.in_flight UpDownCounter, process-local; the in-flight series intentionally omits per-message attributes so increments and decrements pair up cleanly. Common attributes are fedify.queue.role, fedify.queue.backend (queue.constructor.name verbatim, omitted for plain object literals), fedify.queue.native_retrial, and activitypub.activity.type when available. The enqueue counter additionally carries fedify.queue.task.attempt; the completion-side instruments carry fedify.queue.task.result (completed, failed, or aborted). Aborts are detected via error?.name === "AbortError". Instrumentation points: - Initial inbox enqueue in routeActivity(). - Initial outbox enqueue in sendActivityInternal() and queued forwardActivityInternal(). - Initial fanout enqueue in Context.sendActivity(). - Retry re-enqueues in #listenOutboxMessage() and #listenInboxMessage(), guarded so the metric only fires when the enqueue actually happens. - All three branches of processQueuedTask() are wrapped with started/in-flight/completed-or-failed-or-aborted/duration. #listenInboxMessage() now exposes the parsed activity type to its caller via an onActivityType callback so processQueuedTask can attach activitypub.activity.type to the inbox-task completion measurements. When a queue does not declare nativeRetrial and Fedify catches an inbox listener or outbox delivery error to schedule a retry, the worker boundary records result=completed; the retry is observable as a new fedify.queue.task.enqueued measurement with non-zero fedify.queue.task.attempt, and outbox-side delivery failures remain observable through activitypub.delivery.* and the activitypub.delivery.failed span event. The OpenTelemetry manual page now documents this contrast explicitly. Tests cover success, native-retrial failure, internally-handled retry, abort, and forwardActivity enqueue paths. CHANGES.md, the OpenTelemetry manual, and the message queue manual are updated. fedify-dev#316 Closes fedify-dev#740 Assisted-by: Claude Code:claude-opus-4-7 Assisted-by: Codex:gpt-5.2-codex
Improve the formatting of the OpenTelemetry queue task metrics list in CHANGES.md and add a reference to pull request fedify-dev#759.
|
@codex review |
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds OpenTelemetry queue task metrics (enqueued/started/completed/failed/duration/in_flight), integrates recording calls into enqueue and processing paths for inbox/outbox/fanout, adds types and recording methods to FederationMetrics, updates tests to assert emitted metrics, and documents the new metrics. ChangesQueue Task Observability
Sequence Diagram(s)sequenceDiagram
participant Client
participant QueueBackend
participant Worker
participant FederationMetrics
Client->>QueueBackend: enqueue message
QueueBackend->>Worker: deliver queued message
Worker->>FederationMetrics: recordQueueTaskStarted(common)
Worker->>Worker: process task
Worker->>FederationMetrics: recordQueueTaskOutcome(common, result, duration)
Worker->>FederationMetrics: decrementQueueTaskInFlight(common)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related issues
Possibly related PRs
Suggested labels
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@packages/fedify/src/federation/handler.test.ts`:
- Around line 2543-2550: The test currently checks attributes on the enqueued
metric but should also assert the metric type and that the backend attribute is
absent: after calling recorder.getMeasurements("fedify.queue.task.enqueued") and
selecting enqueued[0], add an assertion that its descriptor.type (or the
measurement type property used in tests) equals the expected metric type (e.g.,
"counter" to match other tests like the one around line 2670) and an assertion
that enqueued[0].attributes does not have a "fedify.backend" key (or that
enqueued[0].attributes["fedify.backend"] is undefined) so plain Object metrics
omit backend. Ensure you reference the existing recorder.getMeasurements and
enqueued[0] when adding these assertions.
In `@packages/fedify/src/federation/inbox.ts`:
- Around line 181-184: ContextImpl.routeActivityInternal() calls routeActivity()
without passing the optional federation meter provider, so
getFederationMetrics(...).recordQueueTaskEnqueued ends up using the global
provider; update ContextImpl.routeActivityInternal to forward
this.federation.meterProvider when invoking routeActivity (and any internal
calls that reach the programmatic routeActivity path) so that routeActivity uses
the configured meter provider; specifically ensure the routeActivity(...)
invocation includes the federation.meterProvider argument and that
routeActivity’s signature/prop forwarding (if needed) accepts and passes that
provider through to getFederationMetrics.
In `@packages/fedify/src/federation/metrics.ts`:
- Around line 125-131: recordQueueTaskOutcome currently increments
fedify.queue.task.completed only when outcome === "completed", causing failed
and aborted terminal attempts to be omitted; update recordQueueTaskOutcome (and
the similar block around the second occurrence noted) so that
fedify.queue.task.completed is incremented for every terminal outcome
("completed", "failed", "aborted"), using the same label set (including result
label) you use for fedify.queue.task.result/duration, i.e., move or duplicate
the counter.increment call out of the "completed"-only branch and call it with {
result: outcome, queue: ..., worker: ... } so each terminal attempt is counted.
In `@packages/fedify/src/federation/middleware.test.ts`:
- Around line 4406-4468: Rename or update the test step to remove "enqueue" from
the title or explicitly assert there are no enqueue metrics: after calling
federation.processQueuedTask(...) in the test ("records outbox enqueue and
worker metrics"), add an assertion that
recorder.getMeasurements("fedify.queue.task.enqueued").length is 0 to ensure no
spurious enqueue metric is recorded; alternatively change the step title to
"records outbox worker metrics" if you prefer to not add the assertion. Ensure
the check is adjacent to the existing started/completed assertions and
references the same recorder used in this test.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 7bc87e07-7d0d-4f4e-98c9-8c809dc0bc05
📒 Files selected for processing (8)
CHANGES.mddocs/manual/mq.mddocs/manual/opentelemetry.mdpackages/fedify/src/federation/handler.test.tspackages/fedify/src/federation/inbox.tspackages/fedify/src/federation/metrics.tspackages/fedify/src/federation/middleware.test.tspackages/fedify/src/federation/middleware.ts
|
Codex Review: Didn't find any major issues. Keep them coming! ℹ️ About Codex in GitHubYour team has set up Codex to review pull requests in this repo. Reviews are triggered when you
If Codex has suggestions, it will comment; otherwise it will react with 👍. Codex can also answer questions or update the PR. Try commenting "@codex address that feedback". |
Codecov Report❌ Patch coverage is
... and 3 files with indirect coverage changes 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Code Review
This pull request introduces a comprehensive set of OpenTelemetry metrics for Fedify's message queue tasks, covering enqueueing, starting, completion, failure, duration, and in-flight tasks across inbox, outbox, and fanout roles. It includes updates to the core federation logic to instrument these metrics, detailed documentation in the manual, and extensive test coverage for various success and failure scenarios. I have no feedback to provide.
ContextImpl.routeActivityInternal() called the routeActivity() helper in inbox.ts without passing this.federation.meterProvider, so the fedify.queue.task.enqueued counter recorded from the public Context.routeActivity() API path was emitted on the global MeterProvider instead of the configured one. The fetch-driven inbox enqueue path through handleInbox() forwarded the provider correctly, so this only affected callers using ctx.routeActivity() directly. Forward this.federation.meterProvider into the routeActivity() call and add a regression test that exercises the path with a configured test meter provider. fedify-dev#759 (comment) Assisted-by: Claude Code:claude-opus-4-7 Assisted-by: Codex:gpt-5.2-codex
Two CodeRabbit nitpicks on the queue task metric tests: 1. handleInbox() inbox-enqueue test (handler.test.ts) now also asserts the measurement's instrument type is "counter", and that the fedify.queue.backend attribute is undefined when the queue is an object literal (whose constructor.name is "Object" and is filtered by getQueueBackend()). 2. processQueuedTask() outbox worker step (middleware.test.ts) is renamed to "records outbox worker metrics on successful delivery", since the body asserts only worker-side started/completed and never exercised the enqueue counter. An assertion that no enqueue measurement is emitted on the success path is added so accidental double-counting in #listenOutboxMessage() would fail this test. fedify-dev#759 (comment) fedify-dev#759 (comment) Assisted-by: Claude Code:claude-opus-4-7 Assisted-by: Codex:gpt-5.2-codex
|
/gemini review |
|
@codex review |
|
Codex Review: Didn't find any major issues. Can't wait for the next one! ℹ️ About Codex in GitHubYour team has set up Codex to review pull requests in this repo. Reviews are triggered when you
If Codex has suggestions, it will comment; otherwise it will react with 👍. Codex can also answer questions or update the PR. Try commenting "@codex address that feedback". |
There was a problem hiding this comment.
Code Review
This pull request introduces a comprehensive suite of OpenTelemetry metrics for Fedify's queue system, covering inbox, outbox, and fanout roles. It adds counters for task lifecycle events (enqueued, started, completed, failed), a histogram for processing duration, and an up/down counter for tracking in-flight tasks. The implementation includes core logic updates, detailed documentation, and extensive unit tests. Feedback suggests improving the duration histogram by defining explicit bucket boundaries for better aggregation and notes a limitation in correlating 'started' metrics with activity types for inbox tasks since parsing occurs after the task begins.
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
packages/fedify/src/federation/middleware.ts (1)
1387-1460: 🧹 Nitpick | 🔵 Trivial | ⚡ Quick winConsolidate the two per-message enqueue branches in
sendActivity.Lines 1397–1419 (when
enqueueMany == null) and lines 1423–1447 (whenenqueueManyexists butorderingKey != null) are nearly identical: both doPromise.allSettledover per-messageoutboxQueue.enqueuecalls with the same error aggregation. The sibling implementation inforwardActivityInternal(line 3394) already collapses these cases withoutboxQueue.enqueueMany == null || orderingKey != null. Aligning here removes ~25 duplicated lines and one slightly diverging error path.♻️ Proposed consolidation
- if (outboxQueue.enqueueMany == null) { + if (outboxQueue.enqueueMany == null || orderingKey != null) { const promises: PromiseSettledResult<void>[] = await Promise.allSettled( messages.map(async (m) => { await outboxQueue.enqueue(m.message, { orderingKey: m.orderingKey }); recordOutboxEnqueued(m.message); }), ); - const errors = promises + const errors: unknown[] = promises .filter((r) => r.status === "rejected") .map((r) => (r as PromiseRejectedResult).reason); if (errors.length > 0) { logger.error( "Failed to enqueue activity {activityId} to send later: {errors}", { activityId: activity.id!.href, errors }, ); if (errors.length > 1) { throw new AggregateError( errors, `Failed to enqueue activity ${activityId} to send later.`, ); } throw errors[0]; } } else { - // Note: enqueueMany does not support per-message orderingKey, - // so we fall back to individual enqueues when orderingKey is specified - if (orderingKey != null) { - const promises: PromiseSettledResult<void>[] = await Promise.allSettled( - messages.map(async (m) => { - await outboxQueue.enqueue(m.message, { - orderingKey: m.orderingKey, - }); - recordOutboxEnqueued(m.message); - }), - ); - const errors = promises - .filter((r) => r.status === "rejected") - .map((r) => (r as PromiseRejectedResult).reason); - if (errors.length > 0) { - logger.error( - "Failed to enqueue activity {activityId} to send later: {errors}", - { activityId: activity.id!.href, errors }, - ); - if (errors.length > 1) { - throw new AggregateError( - errors, - `Failed to enqueue activity ${activityId} to send later.`, - ); - } - throw errors[0]; - } - } else { - try { - await outboxQueue.enqueueMany(messages.map((m) => m.message)); - } catch (error) { - logger.error( - "Failed to enqueue activity {activityId} to send later: {error}", - { activityId: activity.id!.href, error }, - ); - throw error; - } - for (const m of messages) recordOutboxEnqueued(m.message); - } + try { + await outboxQueue.enqueueMany(messages.map((m) => m.message)); + } catch (error) { + logger.error( + "Failed to enqueue activity {activityId} to send later: {error}", + { activityId: activity.id!.href, error }, + ); + throw error; + } + for (const m of messages) recordOutboxEnqueued(m.message); }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/fedify/src/federation/middleware.ts` around lines 1387 - 1460, The two nearly identical per-message enqueue branches in sendActivity should be consolidated: change the conditional that currently checks outboxQueue.enqueueMany == null (and the separate orderingKey != null branch) into a single branch using if (outboxQueue.enqueueMany == null || orderingKey != null) and run the Promise.allSettled over messages calling outboxQueue.enqueue with orderingKey, invoking recordOutboxEnqueued for each, then aggregate errors exactly as done today (logger.error, throw AggregateError when >1, else throw the single error). Keep the else path that calls outboxQueue.enqueueMany(messages.map(m => m.message)) with its try/catch and post-success recordOutboxEnqueued loop unchanged; remove the duplicate Promise.allSettled block previously duplicated in sendActivity.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@packages/fedify/src/federation/middleware.ts`:
- Around line 1387-1396: The duplicate recordOutboxEnqueued closure used in
sendActivity and forwardActivityInternal should be extracted to a single
reusable function; add a new free function (e.g.,
recordOutboxEnqueued(meterProvider, outboxQueue, message: OutboxMessage)) in
metrics.ts (or a method on FederationImpl) that calls
getFederationMetrics(meterProvider).recordQueueTaskEnqueued({ role: "outbox",
queue: outboxQueue, activityType: message.activityType }, message.attempt); then
replace both local closures (the ones named recordOutboxEnqueued in sendActivity
and forwardActivityInternal) with calls to that new function, keeping parameter
names meterProvider, outboxQueue and message to make the swap mechanical and
ensure future attribute additions only require one update site.
---
Outside diff comments:
In `@packages/fedify/src/federation/middleware.ts`:
- Around line 1387-1460: The two nearly identical per-message enqueue branches
in sendActivity should be consolidated: change the conditional that currently
checks outboxQueue.enqueueMany == null (and the separate orderingKey != null
branch) into a single branch using if (outboxQueue.enqueueMany == null ||
orderingKey != null) and run the Promise.allSettled over messages calling
outboxQueue.enqueue with orderingKey, invoking recordOutboxEnqueued for each,
then aggregate errors exactly as done today (logger.error, throw AggregateError
when >1, else throw the single error). Keep the else path that calls
outboxQueue.enqueueMany(messages.map(m => m.message)) with its try/catch and
post-success recordOutboxEnqueued loop unchanged; remove the duplicate
Promise.allSettled block previously duplicated in sendActivity.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 0c8aae44-43ba-4679-8df0-fc189902d315
📒 Files selected for processing (3)
packages/fedify/src/federation/handler.test.tspackages/fedify/src/federation/middleware.test.tspackages/fedify/src/federation/middleware.ts
The per-message enqueue branch that ran when the queue lacked enqueueMany() was structurally identical to the one that ran when enqueueMany() existed but the caller specified an orderingKey: both fell back to individual enqueue() calls and recorded recordOutboxEnqueued() for each. Folding both conditions into a single fallback (`enqueueMany == null || orderingKey != null`) removes a copy of the Promise.allSettled error-aggregation block, leaving the enqueueMany() success path untouched. No behavior change. Assisted-by: Claude Code:claude-opus-4-7
Use the same OpenTelemetry HTTP server semantic-conventions buckets (5 ms to 10 s) that fedify.http.server.request.duration already uses. Queue task durations span a similar range: in-process inbox listeners finish in single-digit milliseconds, while outbox delivery to a slow remote inbox can take several seconds. Without explicit buckets, exporters fall back to backend-specific defaults that vary across Prometheus, OTLP/Tempo, and Datadog and make latency dashboards inconsistent across deployments. fedify-dev#759 (comment) Assisted-by: Claude Code:claude-opus-4-7 Assisted-by: Gemini Code Assist
There was a problem hiding this comment.
♻️ Duplicate comments (1)
packages/fedify/src/federation/middleware.ts (1)
1387-1396: 🧹 Nitpick | 🔵 Trivial | 💤 Low valueDuplicate of past review comment.
This
recordOutboxEnqueuedhelper and the one inforwardActivityInternal(lines 3358–3367) are structurally identical; consolidating them was already raised. Flagging here only for tracking.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@packages/fedify/src/federation/middleware.ts` around lines 1387 - 1396, Duplicate helper functions exist: consolidate the two identical recordOutboxEnqueued implementations into a single shared helper and call it from both places. Extract the logic that calls getFederationMetrics(...).recordQueueTaskEnqueued(...) (currently inside recordOutboxEnqueued and the version inside forwardActivityInternal) into one exported/top-level function (e.g., recordOutboxEnqueued or recordQueueTaskEnqueuedHelper), preserve the same parameters (message: OutboxMessage, outboxQueue, meterProvider context), replace the duplicate blocks with calls to that shared function, and ensure imports/exports are updated where needed so both call sites use the single implementation.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Duplicate comments:
In `@packages/fedify/src/federation/middleware.ts`:
- Around line 1387-1396: Duplicate helper functions exist: consolidate the two
identical recordOutboxEnqueued implementations into a single shared helper and
call it from both places. Extract the logic that calls
getFederationMetrics(...).recordQueueTaskEnqueued(...) (currently inside
recordOutboxEnqueued and the version inside forwardActivityInternal) into one
exported/top-level function (e.g., recordOutboxEnqueued or
recordQueueTaskEnqueuedHelper), preserve the same parameters (message:
OutboxMessage, outboxQueue, meterProvider context), replace the duplicate blocks
with calls to that shared function, and ensure imports/exports are updated where
needed so both call sites use the single implementation.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 4f80b376-8728-446b-9b0d-539008648bb6
📒 Files selected for processing (1)
packages/fedify/src/federation/middleware.ts
|
/gemini review |
|
@codex review |
There was a problem hiding this comment.
Code Review
This pull request introduces OpenTelemetry metrics for Fedify's queue tasks, enabling monitoring of enqueueing, processing, and in-flight tasks across inbox, outbox, and fanout roles. The changes include new metric instruments in the FederationMetrics class, middleware integration, and updated documentation. Review feedback identifies performance bottlenecks in batch operations where metric attributes are redundantly calculated, recommending that these be pre-calculated outside of loops to reduce overhead.
|
Codex Review: Didn't find any major issues. You're on a roll. ℹ️ About Codex in GitHubYour team has set up Codex to review pull requests in this repo. Reviews are triggered when you
If Codex has suggestions, it will comment; otherwise it will react with 👍. Codex can also answer questions or update the PR. Try commenting "@codex address that feedback". |
sendActivityInternal() and forwardActivityInternal() each defined an
identical recordOutboxEnqueued() closure that just forwarded
(activityType, attempt) to FederationMetrics.recordQueueTaskEnqueued()
under role=outbox. Move the body to a single top-level function in
metrics.ts, so future attribute additions only need one update site.
The shared helper takes the meter provider, the outbox queue, and a
structural { activityType, attempt } subset of OutboxMessage. Keeping
the parameter as a structural type means metrics.ts does not need to
import from queue.ts.
fedify-dev#759 (comment)
Assisted-by: Claude Code:claude-opus-4-7
Assisted-by: CodeRabbit
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@packages/fedify/src/federation/middleware.ts`:
- Around line 529-535: The catch blocks currently set span.setStatus({ code:
SpanStatusCode.ERROR, message: String(e) }) unconditionally even when
isAbortError(e) is true; change each worker branch (fanout, outbox, inbox) so
that span.setStatus is only called when !isAbortError(e) — keep outcome =
isAbortError(e) ? "aborted" : "failed" but wrap the span.setStatus(...) call in
a non-abort guard (e.g., if (!isAbortError(e)) { span.setStatus(...) }) so
aborted tasks do not mark spans as errors.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: e77dd1ae-e1ff-454e-813c-c7363dbee89e
📒 Files selected for processing (2)
packages/fedify/src/federation/metrics.tspackages/fedify/src/federation/middleware.ts
processQueuedTask() distinguishes aborted queue tasks via fedify.queue.task.result=aborted so that backend shutdown signals do not inflate the failed counter, but each catch block still set SpanStatusCode.ERROR on the consumer span unconditionally. The result was that trace error-rate views reported every cancelled task as a failure even though the metric side already excluded them. Per the OpenTelemetry "Recording errors" guidance, span status MUST remain UNSET for operations that ended without an actual error, and should only flip to ERROR when there is one. Cancellation does not qualify on its own. Guard the span.setStatus() calls in all three worker branches (fanout, outbox, inbox) on isAbortError(e) being false. The metric outcome classification is unchanged. An assertion on the abort-path test confirms the inbox span's status code stays UNSET. fedify-dev#759 (comment) Assisted-by: Claude Code:claude-opus-4-7 Assisted-by: CodeRabbit
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces comprehensive OpenTelemetry metrics for Fedify's queue tasks, covering enqueueing, processing starts, completions, failures, durations, and in-flight counts for inbox, outbox, and fanout roles. The changes include updated documentation and extensive new test cases to verify metric accuracy. A review comment identified a potential issue in middleware.ts where enqueue metrics could be incorrectly recorded if a bulk enqueue operation fails; it is recommended to move the metric recording logic inside the success path of the try-catch block to ensure data integrity.
|
@codex review |
|
Codex Review: Didn't find any major issues. Bravo. ℹ️ About Codex in GitHubYour team has set up Codex to review pull requests in this repo. Reviews are triggered when you
If Codex has suggestions, it will comment; otherwise it will react with 👍. Codex can also answer questions or update the PR. Try commenting "@codex address that feedback". |
This adds OpenTelemetry metrics for Fedify queue task processing: inbox, outbox, and fanout.
MessageQueue.getDepth()(added in #748) reports backend backlog depth; this PR adds the worker side, so operators can see how fast Fedify is draining the queue, how often workers fail, and how many tasks are in flight per process.Closes #740. Part of #316.
Changes
Added six metric instruments in @fedify/fedify:
fedify.queue.task.enqueued(counter,{task})fedify.queue.task.started(counter,{task})fedify.queue.task.completed(counter,{task})fedify.queue.task.failed(counter,{task})fedify.queue.task.duration(histogram,ms)fedify.queue.task.in_flight(up/down counter,{task})Common attributes:
fedify.queue.role(inbox,outbox, orfanout),fedify.queue.backend(queue.constructor.nameverbatim, omitted when the constructor is the plainObject),fedify.queue.native_retrial, andactivitypub.activity.typewhenever Fedify knows the activity type. The enqueue counter additionally carriesfedify.queue.task.attempt(zero-based) so retry re-enqueues are distinguishable from initial enqueues. The completion-side instruments carryfedify.queue.task.result(completed,failed, oraborted).The in-flight up/down counter intentionally omits per-message attributes (activity type, attempt, result) so that increment and decrement operations always pair up cleanly per attribute series. It is process-local; cross-process aggregation is left to the metrics backend.
Aborts are detected via
error?.name === "AbortError", which covers bothDOMExceptionaborts and aborts that arrive without aDOMExceptionwrapper. Aborted tasks recordresult=abortedand do not incrementfedify.queue.task.failed.Instrumented enqueue boundaries: the inbound path through
routeActivity()in packages/fedify/src/federation/inbox.ts, the outbound path throughsendActivityInternal()and the queuedforwardActivityInternal()in packages/fedify/src/federation/middleware.ts, and the fanout enqueue inContext.sendActivity(). Retry re-enqueues in#listenOutboxMessage()and#listenInboxMessage()record the metric only when the queue is configured.Instrumented worker boundary: each branch of
processQueuedTask()is wrapped with started counter, in-flight ±1 around the task body, and completed/failed/aborted plus duration on outcome.#listenInboxMessage()exposes the parsed activity type via anonActivityTypecallback so the inbox completion measurements can carry it.Tests in packages/fedify/src/federation/middleware.test.ts cover the success, native-retrial failure, internally-handled retry, abort, fanout, and
forwardActivityenqueue paths. packages/fedify/src/federation/handler.test.ts adds a test for the inbox enqueue metric throughhandleInbox().Updated docs/manual/opentelemetry.md with the new instruments, attributes, semantic-attribute table entries, and a paragraph contrasting these process-side metrics with the backend-side
MessageQueue.getDepth(). Cross-referenced from docs/manual/mq.md.Updated CHANGES.md.
Notes
When a queue does not declare
nativeRetrialand Fedify catches an inbox listener or outbox delivery error to schedule a retry, the worker boundary recordsresult=completedrather thanfailed, sinceprocessQueuedTask()returned without throwing. The retry remains observable as a newfedify.queue.task.enqueuedmeasurement with non-zerofedify.queue.task.attempt, and outbox-side delivery failures stay observable throughactivitypub.delivery.*metrics and theactivitypub.delivery.failedspan event. Inbox listener errors that the retry policy abandons are visible through error logs and the inbox span's error status, but not through a dedicated metric. This is deliberate:completed/faileddescribe the worker boundary, not the activity-level outcome.The issue's wording suggested an observable gauge for
in_flight, but anUpDownCountermatches the synchronous start/end lifecycle: increment before processing, decrement infinally. It is also testable with the existing @fedify/fixtureTestMeter.The
fedify.queue.backendattribute is the queue class'sconstructor.nameas-is (RedisMessageQueue,InProcessMessageQueue, etc.) rather than a transformed short name, since application-controlled backend implementations should not be renamed by Fedify.Verification
mise checkmise test:denomise test:node