Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,42 @@ To be released.
deliberately exclude raw URLs, query strings, and identifier values to
keep cardinality bounded. [[#316], [#736], [#757]]

- Added OpenTelemetry queue task metrics covering Fedify's enqueue and
worker boundaries for inbox, outbox, and fanout work:

- `fedify.queue.task.enqueued` (counter)
- `fedify.queue.task.started` (counter)
- `fedify.queue.task.completed` (counter)
- `fedify.queue.task.failed` (counter)
- `fedify.queue.task.duration` (histogram)
- `fedify.queue.task.in_flight` (up/down counter, process local)

Instruments carry `fedify.queue.role`, best-effort
`fedify.queue.backend` (the queue implementation's constructor name),
and `fedify.queue.native_retrial`. The enqueue/started/completed/
failed/duration instruments additionally carry
`activitypub.activity.type` whenever Fedify knows the activity type
for the queued message; the in-flight up/down counter deliberately
omits per-message attributes so that increment and decrement
operations always pair up cleanly per attribute series. Enqueue
measurements additionally carry `fedify.queue.task.attempt` for
retries, and the completion-side instruments carry
`fedify.queue.task.result` (`completed`, `failed`, or `aborted`).
Together with `MessageQueue.getDepth()` reporting, these metrics let
operators distinguish a slow-draining queue from a queue that sees
less traffic. [[#316], [#740], [#759]]

[#316]: https://github.com/fedify-dev/fedify/issues/316
[#619]: https://github.com/fedify-dev/fedify/issues/619
[#735]: https://github.com/fedify-dev/fedify/issues/735
[#736]: https://github.com/fedify-dev/fedify/issues/736
[#740]: https://github.com/fedify-dev/fedify/issues/740
[#748]: https://github.com/fedify-dev/fedify/pull/748
[#752]: https://github.com/fedify-dev/fedify/issues/752
[#753]: https://github.com/fedify-dev/fedify/pull/753
[#755]: https://github.com/fedify-dev/fedify/pull/755
[#757]: https://github.com/fedify-dev/fedify/pull/757
[#759]: https://github.com/fedify-dev/fedify/pull/759

### @fedify/fixture

Expand Down
8 changes: 8 additions & 0 deletions docs/manual/mq.md
Original file line number Diff line number Diff line change
Expand Up @@ -1023,6 +1023,14 @@ outbox, and fanout work, observability code should report that queue once as a
shared queue. Reporting the same `getDepth()` result separately for each
logical role would double- or triple-count the backlog.

Queue depth covers only the *backend* side of the queue. To see what
Fedify's workers are doing with the dequeued messages — enqueue rate, task
processing duration, completion versus failure, and how many tasks are in
flight per process — read the matching [`fedify.queue.task.*` OpenTelemetry
metrics](./opentelemetry.md#instrumented-metrics). Backlog depth and task
throughput together let you tell a slowly draining queue apart from one
that simply sees less traffic.

[^amqp-depth]: `AmqpMessageQueue` can count the configured ready queues and
delayed queues created by the same `AmqpMessageQueue` instance.
AMQP 0-9-1 does not provide a portable queue-listing API, so
Expand Down
83 changes: 74 additions & 9 deletions docs/manual/opentelemetry.md
Original file line number Diff line number Diff line change
Expand Up @@ -296,15 +296,21 @@ Instrumented metrics

Fedify records the following OpenTelemetry metrics:

| Metric name | Instrument | Unit | Description |
| -------------------------------------------- | ---------- | ----------- | --------------------------------------------------------------- |
| `activitypub.delivery.sent` | Counter | `{attempt}` | Counts outgoing ActivityPub delivery attempts. |
| `activitypub.delivery.permanent_failure` | Counter | `{failure}` | Counts outgoing deliveries abandoned as permanent failures. |
| `activitypub.delivery.duration` | Histogram | `ms` | Measures outgoing ActivityPub delivery attempt duration. |
| `activitypub.inbox.processing_duration` | Histogram | `ms` | Measures inbox listener processing duration. |
| `activitypub.signature.verification_failure` | Counter | `{failure}` | Counts failed signature verification for inbox requests. |
| `fedify.http.server.request.count` | Counter | `{request}` | Counts inbound HTTP requests handled by `Federation.fetch()`. |
| `fedify.http.server.request.duration` | Histogram | `ms` | Measures inbound HTTP request duration in `Federation.fetch()`. |
| Metric name | Instrument | Unit | Description |
| -------------------------------------------- | ------------- | ----------- | --------------------------------------------------------------- |
| `activitypub.delivery.sent` | Counter | `{attempt}` | Counts outgoing ActivityPub delivery attempts. |
| `activitypub.delivery.permanent_failure` | Counter | `{failure}` | Counts outgoing deliveries abandoned as permanent failures. |
| `activitypub.delivery.duration` | Histogram | `ms` | Measures outgoing ActivityPub delivery attempt duration. |
| `activitypub.inbox.processing_duration` | Histogram | `ms` | Measures inbox listener processing duration. |
| `activitypub.signature.verification_failure` | Counter | `{failure}` | Counts failed signature verification for inbox requests. |
| `fedify.http.server.request.count` | Counter | `{request}` | Counts inbound HTTP requests handled by `Federation.fetch()`. |
| `fedify.http.server.request.duration` | Histogram | `ms` | Measures inbound HTTP request duration in `Federation.fetch()`. |
| `fedify.queue.task.enqueued` | Counter | `{task}` | Counts inbox, outbox, and fanout tasks Fedify enqueued. |
| `fedify.queue.task.started` | Counter | `{task}` | Counts queue tasks Fedify began processing as a worker. |
| `fedify.queue.task.completed` | Counter | `{task}` | Counts queue tasks Fedify finished processing without throwing. |
| `fedify.queue.task.failed` | Counter | `{task}` | Counts queue tasks Fedify abandoned because processing threw. |
| `fedify.queue.task.duration` | Histogram | `ms` | Measures queue task processing duration in Fedify workers. |
| `fedify.queue.task.in_flight` | UpDownCounter | `{task}` | Tracks queue tasks currently in flight in this Fedify process. |

### Metric attributes

Expand Down Expand Up @@ -339,6 +345,60 @@ Fedify records the following OpenTelemetry metrics:
parameter names (for example `/users/{identifier}`) rather than the
matched parameter values.

`fedify.queue.task.enqueued`, `fedify.queue.task.started`,
`fedify.queue.task.completed`, `fedify.queue.task.failed`, and
`fedify.queue.task.duration`
: `fedify.queue.role` (`inbox`, `outbox`, or `fanout`) is always present.
`fedify.queue.backend` is the queue implementation's constructor name
(for example `RedisMessageQueue`) when available; it is omitted for
queues whose constructor is the plain `Object` (for example,
`MessageQueue` instances built from an object literal).
`fedify.queue.native_retrial` reflects the queue backend's `nativeRetrial`
flag when set on the queue. `activitypub.activity.type` is recorded
whenever Fedify knows the activity type for the queued message; for inbox
tasks the type only becomes available after the activity is parsed, so the
*started* counter for inbox tasks may be recorded without it.
`fedify.queue.task.enqueued` additionally carries a zero-based
`fedify.queue.task.attempt` so that retry re-enqueues are distinguishable
from initial enqueues. `fedify.queue.task.completed`,
`fedify.queue.task.failed`, and `fedify.queue.task.duration` carry
`fedify.queue.task.result`, which is `completed` when processing returned
without throwing, `failed` when the worker re-threw a non-abort error, and
`aborted` when the worker re-threw an `AbortError` (for example, because a
graceful-shutdown `AbortSignal` interrupted processing). When the queue
backend does not declare `nativeRetrial`, Fedify catches inbox listener and
outbox delivery errors itself; if its retry policy still allows another
attempt, it schedules a retry by re-enqueuing the message and returns from
the worker without re-throwing, so the worker boundary records
`result=completed`. When the retry policy gives up, the worker also
returns normally (`result=completed`) without scheduling a retry.
Outbox-side activity failures remain observable through the
`activitypub.delivery.*` metrics and the `activitypub.delivery.failed`
span event, and any retry attempt — inbox or outbox — appears as a
`fedify.queue.task.enqueued` measurement with a non-zero
`fedify.queue.task.attempt`. Inbox listener errors that the retry policy
abandons are visible through error logs and the inbox span's error status,
but not through a dedicated metric.

`fedify.queue.task.in_flight`
: `fedify.queue.role` and `fedify.queue.backend` (when available), plus
`fedify.queue.native_retrial` when set on the queue. Per-message
attributes such as `activitypub.activity.type`,
`fedify.queue.task.attempt`, and `fedify.queue.task.result` are
deliberately omitted so that increment and decrement operations always
pair up cleanly per attribute series. This UpDownCounter is
process-local: it tracks tasks currently being processed *in this
Fedify process*, not cross-process totals. Aggregate it across
replicas in your metrics backend.

The `fedify.queue.task.*` metrics describe what Fedify's workers do with
queued messages. They complement the backend-side
[`MessageQueue.getDepth()` API](./mq.md#queue-depth-reporting), which
reports how many messages are currently waiting in the queue backend.
Reading both signals together — task throughput plus backlog depth —
makes it possible to distinguish a small, slow queue from a large, fast
one and to set alerting thresholds for delivery latency under load.

Fedify records `activitypub.remote.host` as the URL hostname only; ports, paths,
and query strings are deliberately excluded to keep metric cardinality bounded.
Activity types use the same qualified URI form as Fedify's trace attributes,
Expand Down Expand Up @@ -407,6 +467,11 @@ for ActivityPub:
| `fedify.object.values.{parameter}` | string[] | The argument values of the object dispatcher. | `["1", "2"]` |
| `fedify.collection.cursor` | string | The cursor of the collection. | `"eyJpZCI6IjEiLCJ0eXBlIjoiT3JkZXJlZENvbGxlY3Rpb24ifQ=="` |
| `fedify.collection.items` | number | The number of items in the collection page. It can be less than the total items. | `10` |
| `fedify.queue.role` | string | The Fedify queue role for the task: `inbox`, `outbox`, or `fanout`. | `"outbox"` |
| `fedify.queue.backend` | string | The queue implementation's constructor name (best-effort backend identifier). | `"RedisMessageQueue"` |
| `fedify.queue.native_retrial` | boolean | Whether the queue backend declares `nativeRetrial`, meaning Fedify defers retry handling to the backend. | `true` |
| `fedify.queue.task.attempt` | int | The zero-based attempt number recorded on `fedify.queue.task.enqueued`; non-zero for retry re-enqueues. | `1` |
| `fedify.queue.task.result` | string | The terminal outcome of queue task processing: `completed`, `failed`, or `aborted`. | `"failed"` |
| `http.redirect.url` | string | The redirect URL when a document fetch results in a redirect. | `"https://example.com/new-location"` |
| `http.response.status_code` | int | The HTTP response status code. | `200` |
| `http_signatures.signature` | string | The signature of the HTTP request in hexadecimal. | `"73a74c990beabe6e59cc68f9c6db7811b59cbb22fd12dcffb3565b651540efe9"` |
Expand Down
101 changes: 101 additions & 0 deletions packages/fedify/src/federation/handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ import {
import { ActivityListenerSet } from "./activity-listener.ts";
import { MemoryKvStore } from "./kv.ts";
import { createFederation } from "./middleware.ts";
import type { MessageQueue } from "./mq.ts";

const QUOTE_CONTEXT_TERMS = {
QuoteAuthorization: "https://w3id.org/fep/044f#QuoteAuthorization",
Expand Down Expand Up @@ -2453,6 +2454,106 @@ test("handleInbox() records OpenTelemetry span events", async () => {
);
});

test("handleInbox() records fedify.queue.task.enqueued when queued", async () => {
const [meterProvider, recorder] = createTestMeterProvider();
const kv = new MemoryKvStore();
const federation = createFederation<void>({
kv,
meterProvider,
});

const activity = new Create({
id: new URL("https://example.com/activities/queued"),
actor: new URL("https://example.com/users/someone"),
object: new Note({
id: new URL("https://example.com/note-queued"),
content: "Queue me up",
}),
});

const request = new Request("https://example.com/users/someone/inbox", {
method: "POST",
headers: { "Content-Type": "application/activity+json" },
body: JSON.stringify(await activity.toJsonLd()),
});
const signed = await signRequest(
request,
rsaPrivateKey3,
new URL("https://example.com/users/someone#main-key"),
);

const context = createRequestContext<void>({
federation,
request: signed,
url: new URL(signed.url),
data: undefined,
documentLoader: mockDocumentLoader,
contextLoader: mockDocumentLoader,
getActorUri(identifier: string) {
return new URL(`https://example.com/users/${identifier}`);
},
});

const actorDispatcher: ActorDispatcher<void> = (ctx, identifier) => {
if (identifier !== "someone") return null;
return new Person({
id: ctx.getActorUri(identifier),
name: "Someone",
inbox: new URL("https://example.com/users/someone/inbox"),
publicKey: rsaPublicKey2,
});
};

const queuedMessages: unknown[] = [];
const queue: MessageQueue = {
enqueue(message, _options) {
queuedMessages.push(message);
return Promise.resolve();
},
listen(_handler, _options) {
return Promise.resolve();
},
};

const response = await handleInbox(signed, {
recipient: "someone",
context,
inboxContextFactory(_activity) {
return createInboxContext({ ...context, clone: undefined });
},
kv,
kvPrefixes: {
activityIdempotence: ["activityIdempotence"],
publicKey: ["publicKey"],
acceptSignatureNonce: ["acceptSignatureNonce"],
},
actorDispatcher,
inboxListeners: new ActivityListenerSet<InboxContext<void>>(),
inboxErrorHandler: undefined,
onNotFound: (_request) => new Response("Not found", { status: 404 }),
signatureTimeWindow: false,
skipSignatureVerification: true,
queue,
meterProvider,
});

assertEquals(response.status, 202);
assertEquals(queuedMessages.length, 1);

const enqueued = recorder.getMeasurements("fedify.queue.task.enqueued");
assertEquals(enqueued.length, 1);
assertEquals(enqueued[0].type, "counter");
assertEquals(enqueued[0].attributes["fedify.queue.role"], "inbox");
assertEquals(enqueued[0].attributes["fedify.queue.task.attempt"], 0);
assertEquals(
enqueued[0].attributes["activitypub.activity.type"],
"https://www.w3.org/ns/activitystreams#Create",
);
Comment thread
dahlia marked this conversation as resolved.
// The queue here is an object literal, so getQueueBackend() should omit the
// backend attribute rather than emit "Object".
assertEquals(enqueued[0].attributes["fedify.queue.backend"], undefined);
});

test("handleInbox() records unverified HTTP signature details", async () => {
const [tracerProvider, exporter] = createTestTracerProvider();
const [meterProvider, recorder] = createTestMeterProvider();
Expand Down
4 changes: 4 additions & 0 deletions packages/fedify/src/federation/inbox.ts
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ export async function routeActivity<TContextData>(
});
throw error;
}
getFederationMetrics(meterProvider).recordQueueTaskEnqueued(
{ role: "inbox", queue, activityType: getTypeId(activity).href },
0,
);
Comment thread
dahlia marked this conversation as resolved.
logger.info(
"Activity {activityId} is enqueued.",
{ activityId: activity.id?.href, activity: json, recipient },
Expand Down
Loading