diff --git a/CHANGES.md b/CHANGES.md index ba213c2b2..662e9f8e5 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -51,15 +51,42 @@ To be released. deliberately exclude raw URLs, query strings, and identifier values to keep cardinality bounded. [[#316], [#736], [#757]] + - Added OpenTelemetry queue task metrics covering Fedify's enqueue and + worker boundaries for inbox, outbox, and fanout work: + + - `fedify.queue.task.enqueued` (counter) + - `fedify.queue.task.started` (counter) + - `fedify.queue.task.completed` (counter) + - `fedify.queue.task.failed` (counter) + - `fedify.queue.task.duration` (histogram) + - `fedify.queue.task.in_flight` (up/down counter, process local) + + Instruments carry `fedify.queue.role`, best-effort + `fedify.queue.backend` (the queue implementation's constructor name), + and `fedify.queue.native_retrial`. The enqueue/started/completed/ + failed/duration instruments additionally carry + `activitypub.activity.type` whenever Fedify knows the activity type + for the queued message; the in-flight up/down counter deliberately + omits per-message attributes so that increment and decrement + operations always pair up cleanly per attribute series. Enqueue + measurements additionally carry `fedify.queue.task.attempt` for + retries, and the completion-side instruments carry + `fedify.queue.task.result` (`completed`, `failed`, or `aborted`). + Together with `MessageQueue.getDepth()` reporting, these metrics let + operators distinguish a slow-draining queue from a queue that sees + less traffic. [[#316], [#740], [#759]] + [#316]: https://github.com/fedify-dev/fedify/issues/316 [#619]: https://github.com/fedify-dev/fedify/issues/619 [#735]: https://github.com/fedify-dev/fedify/issues/735 [#736]: https://github.com/fedify-dev/fedify/issues/736 +[#740]: https://github.com/fedify-dev/fedify/issues/740 [#748]: https://github.com/fedify-dev/fedify/pull/748 [#752]: https://github.com/fedify-dev/fedify/issues/752 [#753]: https://github.com/fedify-dev/fedify/pull/753 [#755]: https://github.com/fedify-dev/fedify/pull/755 [#757]: https://github.com/fedify-dev/fedify/pull/757 +[#759]: https://github.com/fedify-dev/fedify/pull/759 ### @fedify/fixture diff --git a/docs/manual/mq.md b/docs/manual/mq.md index db78998d3..e2948967d 100644 --- a/docs/manual/mq.md +++ b/docs/manual/mq.md @@ -1023,6 +1023,14 @@ outbox, and fanout work, observability code should report that queue once as a shared queue. Reporting the same `getDepth()` result separately for each logical role would double- or triple-count the backlog. +Queue depth covers only the *backend* side of the queue. To see what +Fedify's workers are doing with the dequeued messages — enqueue rate, task +processing duration, completion versus failure, and how many tasks are in +flight per process — read the matching [`fedify.queue.task.*` OpenTelemetry +metrics](./opentelemetry.md#instrumented-metrics). Backlog depth and task +throughput together let you tell a slowly draining queue apart from one +that simply sees less traffic. + [^amqp-depth]: `AmqpMessageQueue` can count the configured ready queues and delayed queues created by the same `AmqpMessageQueue` instance. AMQP 0-9-1 does not provide a portable queue-listing API, so diff --git a/docs/manual/opentelemetry.md b/docs/manual/opentelemetry.md index e6288ba9c..7cbe70e08 100644 --- a/docs/manual/opentelemetry.md +++ b/docs/manual/opentelemetry.md @@ -296,15 +296,21 @@ Instrumented metrics Fedify records the following OpenTelemetry metrics: -| Metric name | Instrument | Unit | Description | -| -------------------------------------------- | ---------- | ----------- | --------------------------------------------------------------- | -| `activitypub.delivery.sent` | Counter | `{attempt}` | Counts outgoing ActivityPub delivery attempts. | -| `activitypub.delivery.permanent_failure` | Counter | `{failure}` | Counts outgoing deliveries abandoned as permanent failures. | -| `activitypub.delivery.duration` | Histogram | `ms` | Measures outgoing ActivityPub delivery attempt duration. | -| `activitypub.inbox.processing_duration` | Histogram | `ms` | Measures inbox listener processing duration. | -| `activitypub.signature.verification_failure` | Counter | `{failure}` | Counts failed signature verification for inbox requests. | -| `fedify.http.server.request.count` | Counter | `{request}` | Counts inbound HTTP requests handled by `Federation.fetch()`. | -| `fedify.http.server.request.duration` | Histogram | `ms` | Measures inbound HTTP request duration in `Federation.fetch()`. | +| Metric name | Instrument | Unit | Description | +| -------------------------------------------- | ------------- | ----------- | --------------------------------------------------------------- | +| `activitypub.delivery.sent` | Counter | `{attempt}` | Counts outgoing ActivityPub delivery attempts. | +| `activitypub.delivery.permanent_failure` | Counter | `{failure}` | Counts outgoing deliveries abandoned as permanent failures. | +| `activitypub.delivery.duration` | Histogram | `ms` | Measures outgoing ActivityPub delivery attempt duration. | +| `activitypub.inbox.processing_duration` | Histogram | `ms` | Measures inbox listener processing duration. | +| `activitypub.signature.verification_failure` | Counter | `{failure}` | Counts failed signature verification for inbox requests. | +| `fedify.http.server.request.count` | Counter | `{request}` | Counts inbound HTTP requests handled by `Federation.fetch()`. | +| `fedify.http.server.request.duration` | Histogram | `ms` | Measures inbound HTTP request duration in `Federation.fetch()`. | +| `fedify.queue.task.enqueued` | Counter | `{task}` | Counts inbox, outbox, and fanout tasks Fedify enqueued. | +| `fedify.queue.task.started` | Counter | `{task}` | Counts queue tasks Fedify began processing as a worker. | +| `fedify.queue.task.completed` | Counter | `{task}` | Counts queue tasks Fedify finished processing without throwing. | +| `fedify.queue.task.failed` | Counter | `{task}` | Counts queue tasks Fedify abandoned because processing threw. | +| `fedify.queue.task.duration` | Histogram | `ms` | Measures queue task processing duration in Fedify workers. | +| `fedify.queue.task.in_flight` | UpDownCounter | `{task}` | Tracks queue tasks currently in flight in this Fedify process. | ### Metric attributes @@ -339,6 +345,60 @@ Fedify records the following OpenTelemetry metrics: parameter names (for example `/users/{identifier}`) rather than the matched parameter values. +`fedify.queue.task.enqueued`, `fedify.queue.task.started`, +`fedify.queue.task.completed`, `fedify.queue.task.failed`, and +`fedify.queue.task.duration` +: `fedify.queue.role` (`inbox`, `outbox`, or `fanout`) is always present. + `fedify.queue.backend` is the queue implementation's constructor name + (for example `RedisMessageQueue`) when available; it is omitted for + queues whose constructor is the plain `Object` (for example, + `MessageQueue` instances built from an object literal). + `fedify.queue.native_retrial` reflects the queue backend's `nativeRetrial` + flag when set on the queue. `activitypub.activity.type` is recorded + whenever Fedify knows the activity type for the queued message; for inbox + tasks the type only becomes available after the activity is parsed, so the + *started* counter for inbox tasks may be recorded without it. + `fedify.queue.task.enqueued` additionally carries a zero-based + `fedify.queue.task.attempt` so that retry re-enqueues are distinguishable + from initial enqueues. `fedify.queue.task.completed`, + `fedify.queue.task.failed`, and `fedify.queue.task.duration` carry + `fedify.queue.task.result`, which is `completed` when processing returned + without throwing, `failed` when the worker re-threw a non-abort error, and + `aborted` when the worker re-threw an `AbortError` (for example, because a + graceful-shutdown `AbortSignal` interrupted processing). When the queue + backend does not declare `nativeRetrial`, Fedify catches inbox listener and + outbox delivery errors itself; if its retry policy still allows another + attempt, it schedules a retry by re-enqueuing the message and returns from + the worker without re-throwing, so the worker boundary records + `result=completed`. When the retry policy gives up, the worker also + returns normally (`result=completed`) without scheduling a retry. + Outbox-side activity failures remain observable through the + `activitypub.delivery.*` metrics and the `activitypub.delivery.failed` + span event, and any retry attempt — inbox or outbox — appears as a + `fedify.queue.task.enqueued` measurement with a non-zero + `fedify.queue.task.attempt`. Inbox listener errors that the retry policy + abandons are visible through error logs and the inbox span's error status, + but not through a dedicated metric. + +`fedify.queue.task.in_flight` +: `fedify.queue.role` and `fedify.queue.backend` (when available), plus + `fedify.queue.native_retrial` when set on the queue. Per-message + attributes such as `activitypub.activity.type`, + `fedify.queue.task.attempt`, and `fedify.queue.task.result` are + deliberately omitted so that increment and decrement operations always + pair up cleanly per attribute series. This UpDownCounter is + process-local: it tracks tasks currently being processed *in this + Fedify process*, not cross-process totals. Aggregate it across + replicas in your metrics backend. + +The `fedify.queue.task.*` metrics describe what Fedify's workers do with +queued messages. They complement the backend-side +[`MessageQueue.getDepth()` API](./mq.md#queue-depth-reporting), which +reports how many messages are currently waiting in the queue backend. +Reading both signals together — task throughput plus backlog depth — +makes it possible to distinguish a small, slow queue from a large, fast +one and to set alerting thresholds for delivery latency under load. + Fedify records `activitypub.remote.host` as the URL hostname only; ports, paths, and query strings are deliberately excluded to keep metric cardinality bounded. Activity types use the same qualified URI form as Fedify's trace attributes, @@ -407,6 +467,11 @@ for ActivityPub: | `fedify.object.values.{parameter}` | string[] | The argument values of the object dispatcher. | `["1", "2"]` | | `fedify.collection.cursor` | string | The cursor of the collection. | `"eyJpZCI6IjEiLCJ0eXBlIjoiT3JkZXJlZENvbGxlY3Rpb24ifQ=="` | | `fedify.collection.items` | number | The number of items in the collection page. It can be less than the total items. | `10` | +| `fedify.queue.role` | string | The Fedify queue role for the task: `inbox`, `outbox`, or `fanout`. | `"outbox"` | +| `fedify.queue.backend` | string | The queue implementation's constructor name (best-effort backend identifier). | `"RedisMessageQueue"` | +| `fedify.queue.native_retrial` | boolean | Whether the queue backend declares `nativeRetrial`, meaning Fedify defers retry handling to the backend. | `true` | +| `fedify.queue.task.attempt` | int | The zero-based attempt number recorded on `fedify.queue.task.enqueued`; non-zero for retry re-enqueues. | `1` | +| `fedify.queue.task.result` | string | The terminal outcome of queue task processing: `completed`, `failed`, or `aborted`. | `"failed"` | | `http.redirect.url` | string | The redirect URL when a document fetch results in a redirect. | `"https://example.com/new-location"` | | `http.response.status_code` | int | The HTTP response status code. | `200` | | `http_signatures.signature` | string | The signature of the HTTP request in hexadecimal. | `"73a74c990beabe6e59cc68f9c6db7811b59cbb22fd12dcffb3565b651540efe9"` | diff --git a/packages/fedify/src/federation/handler.test.ts b/packages/fedify/src/federation/handler.test.ts index 95dc2b144..89bf2c9d5 100644 --- a/packages/fedify/src/federation/handler.test.ts +++ b/packages/fedify/src/federation/handler.test.ts @@ -57,6 +57,7 @@ import { import { ActivityListenerSet } from "./activity-listener.ts"; import { MemoryKvStore } from "./kv.ts"; import { createFederation } from "./middleware.ts"; +import type { MessageQueue } from "./mq.ts"; const QUOTE_CONTEXT_TERMS = { QuoteAuthorization: "https://w3id.org/fep/044f#QuoteAuthorization", @@ -2453,6 +2454,106 @@ test("handleInbox() records OpenTelemetry span events", async () => { ); }); +test("handleInbox() records fedify.queue.task.enqueued when queued", async () => { + const [meterProvider, recorder] = createTestMeterProvider(); + const kv = new MemoryKvStore(); + const federation = createFederation({ + kv, + meterProvider, + }); + + const activity = new Create({ + id: new URL("https://example.com/activities/queued"), + actor: new URL("https://example.com/users/someone"), + object: new Note({ + id: new URL("https://example.com/note-queued"), + content: "Queue me up", + }), + }); + + const request = new Request("https://example.com/users/someone/inbox", { + method: "POST", + headers: { "Content-Type": "application/activity+json" }, + body: JSON.stringify(await activity.toJsonLd()), + }); + const signed = await signRequest( + request, + rsaPrivateKey3, + new URL("https://example.com/users/someone#main-key"), + ); + + const context = createRequestContext({ + federation, + request: signed, + url: new URL(signed.url), + data: undefined, + documentLoader: mockDocumentLoader, + contextLoader: mockDocumentLoader, + getActorUri(identifier: string) { + return new URL(`https://example.com/users/${identifier}`); + }, + }); + + const actorDispatcher: ActorDispatcher = (ctx, identifier) => { + if (identifier !== "someone") return null; + return new Person({ + id: ctx.getActorUri(identifier), + name: "Someone", + inbox: new URL("https://example.com/users/someone/inbox"), + publicKey: rsaPublicKey2, + }); + }; + + const queuedMessages: unknown[] = []; + const queue: MessageQueue = { + enqueue(message, _options) { + queuedMessages.push(message); + return Promise.resolve(); + }, + listen(_handler, _options) { + return Promise.resolve(); + }, + }; + + const response = await handleInbox(signed, { + recipient: "someone", + context, + inboxContextFactory(_activity) { + return createInboxContext({ ...context, clone: undefined }); + }, + kv, + kvPrefixes: { + activityIdempotence: ["activityIdempotence"], + publicKey: ["publicKey"], + acceptSignatureNonce: ["acceptSignatureNonce"], + }, + actorDispatcher, + inboxListeners: new ActivityListenerSet>(), + inboxErrorHandler: undefined, + onNotFound: (_request) => new Response("Not found", { status: 404 }), + signatureTimeWindow: false, + skipSignatureVerification: true, + queue, + meterProvider, + }); + + assertEquals(response.status, 202); + assertEquals(queuedMessages.length, 1); + + const enqueued = recorder.getMeasurements("fedify.queue.task.enqueued"); + assertEquals(enqueued.length, 1); + assertEquals(enqueued[0].type, "counter"); + assertEquals(enqueued[0].attributes["fedify.queue.role"], "inbox"); + assertEquals(enqueued[0].attributes["fedify.queue.task.attempt"], 0); + assertEquals( + enqueued[0].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Create", + ); + // The queue here is an object literal, so getQueueBackend() should omit the + // backend attribute rather than emit "Object". + assertEquals(enqueued[0].attributes["fedify.queue.backend"], undefined); +}); + test("handleInbox() records unverified HTTP signature details", async () => { const [tracerProvider, exporter] = createTestTracerProvider(); const [meterProvider, recorder] = createTestMeterProvider(); diff --git a/packages/fedify/src/federation/inbox.ts b/packages/fedify/src/federation/inbox.ts index db66692a8..955e2b5b5 100644 --- a/packages/fedify/src/federation/inbox.ts +++ b/packages/fedify/src/federation/inbox.ts @@ -178,6 +178,10 @@ export async function routeActivity( }); throw error; } + getFederationMetrics(meterProvider).recordQueueTaskEnqueued( + { role: "inbox", queue, activityType: getTypeId(activity).href }, + 0, + ); logger.info( "Activity {activityId} is enqueued.", { activityId: activity.id?.href, activity: json, recipient }, diff --git a/packages/fedify/src/federation/metrics.ts b/packages/fedify/src/federation/metrics.ts index 2decbc9c9..34169bcf2 100644 --- a/packages/fedify/src/federation/metrics.ts +++ b/packages/fedify/src/federation/metrics.ts @@ -4,8 +4,32 @@ import { type Histogram, type MeterProvider, metrics, + type UpDownCounter, } from "@opentelemetry/api"; import metadata from "../../deno.json" with { type: "json" }; +import type { MessageQueue } from "./mq.ts"; + +/** + * The role of a queued task, derived from the queued message's `type` field. + * @since 2.3.0 + */ +export type QueueTaskRole = "fanout" | "outbox" | "inbox"; + +/** + * The terminal result of a queued task processing attempt. + * @since 2.3.0 + */ +export type QueueTaskResult = "completed" | "failed" | "aborted"; + +/** + * Common attributes shared by all queue task metrics. + * @since 2.3.0 + */ +export interface QueueTaskCommonAttributes { + role: QueueTaskRole; + queue?: MessageQueue; + activityType?: string; +} class FederationMetrics { readonly deliverySent: Counter; @@ -15,6 +39,12 @@ class FederationMetrics { readonly inboxProcessingDuration: Histogram; readonly httpServerRequestCount: Counter; readonly httpServerRequestDuration: Histogram; + readonly queueTaskEnqueued: Counter; + readonly queueTaskStarted: Counter; + readonly queueTaskCompleted: Counter; + readonly queueTaskFailed: Counter; + readonly queueTaskDuration: Histogram; + readonly queueTaskInFlight: UpDownCounter; constructor(meterProvider: MeterProvider) { const meter = meterProvider.getMeter(metadata.name, metadata.version); @@ -84,6 +114,61 @@ class FederationMetrics { }, }, ); + this.queueTaskEnqueued = meter.createCounter("fedify.queue.task.enqueued", { + description: "Tasks Fedify enqueued for inbox, outbox, or fanout work.", + unit: "{task}", + }); + this.queueTaskStarted = meter.createCounter("fedify.queue.task.started", { + description: "Tasks Fedify began processing as a queue worker.", + unit: "{task}", + }); + this.queueTaskCompleted = meter.createCounter( + "fedify.queue.task.completed", + { + description: "Queue tasks Fedify finished processing without throwing.", + unit: "{task}", + }, + ); + this.queueTaskFailed = meter.createCounter("fedify.queue.task.failed", { + description: "Queue tasks Fedify abandoned because processing threw.", + unit: "{task}", + }); + this.queueTaskDuration = meter.createHistogram( + "fedify.queue.task.duration", + { + description: "Duration of queue task processing in Fedify workers.", + unit: "ms", + advice: { + // Reuse the OpenTelemetry HTTP server semantic-conventions buckets + // since queue task durations span a similar 5 ms to 10 s range + // (network-bound outbox delivery dominates the tail). + explicitBucketBoundaries: [ + 5, + 10, + 25, + 50, + 75, + 100, + 250, + 500, + 750, + 1000, + 2500, + 5000, + 7500, + 10000, + ], + }, + }, + ); + this.queueTaskInFlight = meter.createUpDownCounter( + "fedify.queue.task.in_flight", + { + description: + "Queue tasks currently being processed in this Fedify process.", + unit: "{task}", + }, + ); } recordDelivery( @@ -151,6 +236,123 @@ class FederationMetrics { this.httpServerRequestCount.add(1, attributes); this.httpServerRequestDuration.record(durationMs, attributes); } + + recordQueueTaskEnqueued( + common: QueueTaskCommonAttributes, + attempt: number, + ): void { + const attributes = buildQueueTaskAttributes(common); + attributes["fedify.queue.task.attempt"] = attempt; + this.queueTaskEnqueued.add(1, attributes); + } + + recordQueueTaskStarted(common: QueueTaskCommonAttributes): void { + this.queueTaskStarted.add(1, buildQueueTaskAttributes(common)); + } + + incrementQueueTaskInFlight(common: QueueTaskCommonAttributes): void { + this.queueTaskInFlight.add(1, buildQueueTaskInFlightAttributes(common)); + } + + decrementQueueTaskInFlight(common: QueueTaskCommonAttributes): void { + this.queueTaskInFlight.add(-1, buildQueueTaskInFlightAttributes(common)); + } + + recordQueueTaskOutcome( + common: QueueTaskCommonAttributes, + result: QueueTaskResult, + durationMs: number, + ): void { + const attributes = buildQueueTaskAttributes(common); + attributes["fedify.queue.task.result"] = result; + if (result === "completed") { + this.queueTaskCompleted.add(1, attributes); + } else if (result === "failed") { + this.queueTaskFailed.add(1, attributes); + } + this.queueTaskDuration.record(durationMs, attributes); + } +} + +function buildQueueTaskAttributes( + common: QueueTaskCommonAttributes, +): Attributes { + const attributes: Attributes = { + "fedify.queue.role": common.role, + }; + const backend = getQueueBackend(common.queue); + if (backend != null) { + attributes["fedify.queue.backend"] = backend; + } + const nativeRetrial = common.queue?.nativeRetrial; + if (typeof nativeRetrial === "boolean") { + attributes["fedify.queue.native_retrial"] = nativeRetrial; + } + if (common.activityType != null) { + attributes["activitypub.activity.type"] = common.activityType; + } + return attributes; +} + +function buildQueueTaskInFlightAttributes( + common: QueueTaskCommonAttributes, +): Attributes { + // The in-flight UpDownCounter is process-local and intentionally omits + // per-message attributes (activity type, attempt, result) so that + // increments and decrements pair up cleanly. + return buildQueueTaskAttributes({ role: common.role, queue: common.queue }); +} + +/** + * Returns the constructor name of the given message queue, when it is a + * meaningful identifier. Used as a best-effort `fedify.queue.backend` + * attribute on queue task metrics; returns `undefined` for plain object + * literals (whose constructor is `Object`) so the attribute does not appear + * with a non-informative value. + * @since 2.3.0 + */ +export function getQueueBackend(queue?: MessageQueue): string | undefined { + const name = queue?.constructor?.name; + if (name == null || name === "" || name === "Object") return undefined; + return name; +} + +/** + * Records `fedify.queue.task.enqueued` for an outgoing outbox enqueue. + * + * Both `Context.sendActivity()` and `OutboxContext.forwardActivity()` enqueue + * outbox messages with the same metric attributes (role, queue, activity + * type, attempt), so they share this helper rather than each defining a local + * closure. + * @since 2.3.0 + */ +export function recordOutboxEnqueue( + meterProvider: MeterProvider | undefined, + outboxQueue: MessageQueue, + message: { readonly activityType: string; readonly attempt: number }, +): void { + getFederationMetrics(meterProvider).recordQueueTaskEnqueued( + { + role: "outbox", + queue: outboxQueue, + activityType: message.activityType, + }, + message.attempt, + ); +} + +/** + * Whether the given thrown value is an `AbortError`. + * + * `processQueuedTask` distinguishes aborted tasks (recorded as + * `fedify.queue.task.result=aborted`) from other failures so that backend + * shutdown signals do not inflate the `fedify.queue.task.failed` counter. + * @since 2.3.0 + */ +export function isAbortError(error: unknown): boolean { + if (error == null || typeof error !== "object") return false; + const name = (error as { name?: unknown }).name; + return typeof name === "string" && name === "AbortError"; } const KNOWN_HTTP_METHODS: ReadonlySet = new Set([ diff --git a/packages/fedify/src/federation/middleware.test.ts b/packages/fedify/src/federation/middleware.test.ts index 7de8556cc..e5339a43c 100644 --- a/packages/fedify/src/federation/middleware.test.ts +++ b/packages/fedify/src/federation/middleware.test.ts @@ -44,6 +44,7 @@ import { rsaPublicKey3, } from "../testing/keys.ts"; import { FetchError, getDocumentLoader } from "@fedify/vocab-runtime"; +import { SpanStatusCode } from "@opentelemetry/api"; import { getAuthenticatedDocumentLoader } from "../utils/docloader.ts"; const documentLoader = getDocumentLoader(); @@ -3129,9 +3130,11 @@ test("Federation.setOutboxListeners()", async (t) => { return Promise.resolve(); }, }; + const [meterProvider, recorder] = createTestMeterProvider(); const federation = new FederationImpl({ kv, contextLoaderFactory: () => mockDocumentLoader, + meterProvider, queue, }); federation @@ -3180,6 +3183,19 @@ test("Federation.setOutboxListeners()", async (t) => { assertEquals((enqueued[0] as OutboxMessage).actorIds, [ "https://remote.example/users/alice", ]); + + const enqueuedMetrics = recorder.getMeasurements( + "fedify.queue.task.enqueued", + ); + assertEquals(enqueuedMetrics.length, 1); + assertEquals( + enqueuedMetrics[0].attributes["fedify.queue.role"], + "outbox", + ); + assertEquals( + enqueuedMetrics[0].attributes["fedify.queue.task.attempt"], + 0, + ); }, ); }); @@ -3694,6 +3710,52 @@ test("FederationImpl.processQueuedTask()", async (t) => { durations[0].attributes["activitypub.activity.type"], "https://www.w3.org/ns/activitystreams#Create", ); + + const started = recorder.getMeasurements("fedify.queue.task.started"); + assertEquals(started.length, 1); + assertEquals(started[0].attributes["fedify.queue.role"], "inbox"); + + const completed = recorder.getMeasurements("fedify.queue.task.completed"); + assertEquals(completed.length, 1); + assertEquals(completed[0].attributes["fedify.queue.role"], "inbox"); + assertEquals( + completed[0].attributes["fedify.queue.task.result"], + "completed", + ); + assertEquals( + completed[0].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Create", + ); + + assertEquals( + recorder.getMeasurements("fedify.queue.task.failed").length, + 0, + ); + + const taskDurations = recorder.getMeasurements( + "fedify.queue.task.duration", + ); + assertEquals(taskDurations.length, 1); + assertEquals(taskDurations[0].type, "histogram"); + assertEquals(taskDurations[0].attributes["fedify.queue.role"], "inbox"); + assertEquals( + taskDurations[0].attributes["fedify.queue.task.result"], + "completed", + ); + + const inFlight = recorder.getMeasurements("fedify.queue.task.in_flight"); + assertEquals(inFlight.length, 2); + assertEquals(inFlight[0].type, "upDownCounter"); + assertEquals(inFlight[0].value, 1); + assertEquals(inFlight[1].value, -1); + // The increment and decrement attribute bags must match exactly so that + // the in-flight gauge always nets to zero per attribute series. + assertEquals(inFlight[0].attributes, inFlight[1].attributes); + assertEquals(inFlight[0].attributes["fedify.queue.role"], "inbox"); + assertEquals( + inFlight[0].attributes["activitypub.activity.type"], + undefined, + ); }); }); @@ -4076,6 +4138,422 @@ test("FederationImpl.processQueuedTask() permanent failure", async (t) => { fetchMock.hardReset(); }); +test("FederationImpl.processQueuedTask() queue task metrics", async (t) => { + await t.step( + "records failed result when worker re-throws (nativeRetrial)", + async () => { + // With nativeRetrial=true the worker leaves retry handling to the queue + // backend, so an inbox listener exception propagates back out of + // processQueuedTask and is recorded as a failed outcome. + const kv = new MemoryKvStore(); + const [meterProvider, recorder] = createTestMeterProvider(); + const queue: MessageQueue = { + nativeRetrial: true, + enqueue(_message, _options) { + return Promise.resolve(); + }, + listen(_handler, _options) { + return Promise.resolve(); + }, + }; + const federation = new FederationImpl({ + kv, + meterProvider, + queue, + }); + federation.setInboxListeners("/users/{identifier}/inbox", "/inbox") + .on(vocab.Create, () => { + throw new Error("Intended error for testing"); + }); + + await assertRejects( + () => + federation.processQueuedTask( + undefined, + { + type: "inbox", + id: crypto.randomUUID(), + baseUrl: "https://example.com", + activity: { + "@context": "https://www.w3.org/ns/activitystreams", + type: "Create", + id: "https://remote.example/activities/2", + actor: "https://remote.example/users/alice", + object: { type: "Note", content: "Hello world" }, + }, + started: new Date().toISOString(), + attempt: 0, + identifier: null, + traceContext: {}, + } satisfies InboxMessage, + ), + Error, + ); + + assertEquals( + recorder.getMeasurements("fedify.queue.task.completed").length, + 0, + ); + const failed = recorder.getMeasurements("fedify.queue.task.failed"); + assertEquals(failed.length, 1); + assertEquals(failed[0].attributes["fedify.queue.role"], "inbox"); + assertEquals(failed[0].attributes["fedify.queue.task.result"], "failed"); + assertEquals(failed[0].attributes["fedify.queue.native_retrial"], true); + assertEquals( + failed[0].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Create", + ); + + const taskDurations = recorder.getMeasurements( + "fedify.queue.task.duration", + ); + assertEquals(taskDurations.length, 1); + assertEquals( + taskDurations[0].attributes["fedify.queue.task.result"], + "failed", + ); + + const inFlight = recorder.getMeasurements("fedify.queue.task.in_flight"); + assertEquals(inFlight.length, 2); + assertEquals(inFlight[0].value, 1); + assertEquals(inFlight[1].value, -1); + assertEquals(inFlight[0].attributes, inFlight[1].attributes); + }, + ); + + await t.step( + "records completed when retry handler swallows listener error", + async () => { + // With nativeRetrial=false the worker schedules a retry and returns + // normally, so processQueuedTask records a completed outcome and a + // separate retry enqueue. + const kv = new MemoryKvStore(); + const [meterProvider, recorder] = createTestMeterProvider(); + const queuedMessages: Message[] = []; + const queue: MessageQueue = { + enqueue(message, _options) { + queuedMessages.push(message); + return Promise.resolve(); + }, + listen(_handler, _options) { + return Promise.resolve(); + }, + }; + const federation = new FederationImpl({ + kv, + meterProvider, + queue, + }); + federation.setInboxListeners("/users/{identifier}/inbox", "/inbox") + .on(vocab.Create, () => { + throw new Error("Intended error for testing"); + }); + + await federation.processQueuedTask( + undefined, + { + type: "inbox", + id: crypto.randomUUID(), + baseUrl: "https://example.com", + activity: { + "@context": "https://www.w3.org/ns/activitystreams", + type: "Create", + id: "https://remote.example/activities/retry", + actor: "https://remote.example/users/alice", + object: { type: "Note", content: "Hello world" }, + }, + started: new Date().toISOString(), + attempt: 0, + identifier: null, + traceContext: {}, + } satisfies InboxMessage, + ); + assertEquals(queuedMessages.length, 1); + + const completed = recorder.getMeasurements("fedify.queue.task.completed"); + assertEquals(completed.length, 1); + assertEquals( + completed[0].attributes["fedify.queue.task.result"], + "completed", + ); + + const enqueued = recorder.getMeasurements("fedify.queue.task.enqueued"); + assertEquals(enqueued.length, 1); + assertEquals(enqueued[0].attributes["fedify.queue.role"], "inbox"); + assertEquals(enqueued[0].attributes["fedify.queue.task.attempt"], 1); + assertEquals( + enqueued[0].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Create", + ); + }, + ); + + await t.step( + "records aborted result when worker re-throws AbortError", + async () => { + const kv = new MemoryKvStore(); + const [meterProvider, recorder] = createTestMeterProvider(); + const [tracerProvider, exporter] = createTestTracerProvider(); + const queue: MessageQueue = { + nativeRetrial: true, + enqueue(_message, _options) { + return Promise.resolve(); + }, + listen(_handler, _options) { + return Promise.resolve(); + }, + }; + const federation = new FederationImpl({ + kv, + meterProvider, + tracerProvider, + queue, + }); + federation.setInboxListeners("/users/{identifier}/inbox", "/inbox") + .on(vocab.Create, () => { + throw new DOMException("aborted", "AbortError"); + }); + + await assertRejects( + () => + federation.processQueuedTask( + undefined, + { + type: "inbox", + id: crypto.randomUUID(), + baseUrl: "https://example.com", + activity: { + "@context": "https://www.w3.org/ns/activitystreams", + type: "Create", + id: "https://remote.example/activities/3", + actor: "https://remote.example/users/alice", + object: { type: "Note", content: "Hello world" }, + }, + started: new Date().toISOString(), + attempt: 0, + identifier: null, + traceContext: {}, + } satisfies InboxMessage, + ), + DOMException, + ); + + assertEquals( + recorder.getMeasurements("fedify.queue.task.failed").length, + 0, + ); + assertEquals( + recorder.getMeasurements("fedify.queue.task.completed").length, + 0, + ); + const taskDurations = recorder.getMeasurements( + "fedify.queue.task.duration", + ); + assertEquals(taskDurations.length, 1); + assertEquals( + taskDurations[0].attributes["fedify.queue.task.result"], + "aborted", + ); + + // Per OpenTelemetry guidance, the inbox span should remain UNSET for + // cancellation and not flip into ERROR status. + const inboxSpans = exporter.getSpans("activitypub.inbox"); + assertEquals(inboxSpans.length, 1); + assertEquals(inboxSpans[0].status.code, SpanStatusCode.UNSET); + }, + ); + + await t.step("records native_retrial and backend attributes", async () => { + const kv = new MemoryKvStore(); + const [meterProvider, recorder] = createTestMeterProvider(); + class TestMessageQueue implements MessageQueue { + readonly nativeRetrial = true; + enqueue(_message: unknown, _options?: unknown): Promise { + return Promise.resolve(); + } + listen(_handler: unknown, _options?: unknown): Promise { + return Promise.resolve(); + } + } + const federation = new FederationImpl({ + kv, + meterProvider, + queue: new TestMessageQueue(), + }); + federation.setInboxListeners("/users/{identifier}/inbox", "/inbox") + .on(vocab.Create, () => {}); + + await federation.processQueuedTask( + undefined, + { + type: "inbox", + id: crypto.randomUUID(), + baseUrl: "https://example.com", + activity: { + "@context": "https://www.w3.org/ns/activitystreams", + type: "Create", + id: "https://remote.example/activities/4", + actor: "https://remote.example/users/alice", + object: { type: "Note", content: "Hello world" }, + }, + started: new Date().toISOString(), + attempt: 0, + identifier: null, + traceContext: {}, + } satisfies InboxMessage, + ); + + const completed = recorder.getMeasurements("fedify.queue.task.completed"); + assertEquals(completed.length, 1); + assertEquals( + completed[0].attributes["fedify.queue.backend"], + "TestMessageQueue", + ); + assertEquals(completed[0].attributes["fedify.queue.native_retrial"], true); + }); + + await t.step( + "records outbox worker metrics on successful delivery", + async () => { + const kv = new MemoryKvStore(); + const [meterProvider, recorder] = createTestMeterProvider(); + const queue: MessageQueue = { + enqueue(_message, _options) { + return Promise.resolve(); + }, + listen(_handler, _options) { + return Promise.resolve(); + }, + }; + const federation = new FederationImpl({ + kv, + meterProvider, + queue, + }); + federation.setInboxListeners("/users/{identifier}/inbox", "/inbox"); + + fetchMock.spyGlobal(); + fetchMock.post("https://remote.example/inbox", { status: 202 }); + try { + await federation.processQueuedTask( + undefined, + { + type: "outbox", + id: crypto.randomUUID(), + baseUrl: "https://example.com", + keys: [], + activity: { + "@context": "https://www.w3.org/ns/activitystreams", + type: "Create", + id: "https://example.com/activities/1", + actor: "https://example.com/users/alice", + object: { type: "Note", content: "test" }, + }, + activityType: "https://www.w3.org/ns/activitystreams#Create", + inbox: "https://remote.example/inbox", + sharedInbox: false, + started: new Date().toISOString(), + attempt: 0, + headers: {}, + traceContext: {}, + } satisfies OutboxMessage, + ); + } finally { + fetchMock.hardReset(); + } + + const started = recorder.getMeasurements("fedify.queue.task.started"); + assertEquals(started.length, 1); + assertEquals(started[0].attributes["fedify.queue.role"], "outbox"); + assertEquals( + started[0].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Create", + ); + + const completed = recorder.getMeasurements("fedify.queue.task.completed"); + assertEquals(completed.length, 1); + assertEquals( + completed[0].attributes["fedify.queue.task.result"], + "completed", + ); + + // Successful outbox delivery should not re-enqueue, so no enqueued + // measurement is expected on this path. The guard catches accidental + // double-counting if the implementation ever changes. + assertEquals( + recorder.getMeasurements("fedify.queue.task.enqueued").length, + 0, + ); + }, + ); + + await t.step( + "records started/completed for a fanout task with no recipients", + async () => { + // A fanout task with no inboxes drops out before sendActivity validates + // keys, so the worker still completes successfully. + const kv = new MemoryKvStore(); + const [meterProvider, recorder] = createTestMeterProvider(); + const exportedKey = await crypto.subtle.exportKey( + "jwk", + rsaPrivateKey3, + ); + const queue: MessageQueue = { + enqueue(_message, _options) { + return Promise.resolve(); + }, + listen(_handler, _options) { + return Promise.resolve(); + }, + }; + const federation = new FederationImpl({ + kv, + meterProvider, + queue, + }); + federation.setInboxListeners("/users/{identifier}/inbox", "/inbox"); + + await federation.processQueuedTask(undefined, { + type: "fanout", + id: crypto.randomUUID(), + baseUrl: "https://example.com", + keys: [ + { + keyId: "https://example.com/users/alice#main-key", + privateKey: exportedKey, + }, + ], + inboxes: {}, + activity: { + "@context": "https://www.w3.org/ns/activitystreams", + type: "Create", + id: "https://example.com/activities/1", + actor: "https://example.com/users/alice", + object: { type: "Note", content: "test" }, + }, + activityType: "https://www.w3.org/ns/activitystreams#Create", + traceContext: {}, + }); + + const started = recorder.getMeasurements("fedify.queue.task.started"); + assertEquals(started.length, 1); + assertEquals(started[0].attributes["fedify.queue.role"], "fanout"); + assertEquals( + started[0].attributes["activitypub.activity.type"], + "https://www.w3.org/ns/activitystreams#Create", + ); + + const completed = recorder.getMeasurements("fedify.queue.task.completed"); + assertEquals(completed.length, 1); + assertEquals(completed[0].attributes["fedify.queue.role"], "fanout"); + assertEquals( + completed[0].attributes["fedify.queue.task.result"], + "completed", + ); + }, + ); +}); + test("ContextImpl.lookupObject()", async (t) => { // Note that this test only checks if allowPrivateAddress option affects // the ContextImpl.lookupObject() method. Other aspects of the method are @@ -5047,6 +5525,61 @@ test({ }, }); +test({ + name: "ContextImpl.routeActivity() forwards meterProvider to inbox enqueue", + permissions: { env: true, read: true }, + async fn() { + const [meterProvider, recorder] = createTestMeterProvider(); + const enqueued: Message[] = []; + const queue: MessageQueue = { + enqueue(message): Promise { + enqueued.push(message); + return Promise.resolve(); + }, + listen(): Promise { + return Promise.resolve(); + }, + }; + const federation = new FederationImpl({ + kv: new MemoryKvStore(), + meterProvider, + queue, + }); + federation.setInboxListeners("/u/{identifier}/i", "/i"); + + const ctx = new ContextImpl({ + url: new URL("https://example.com/"), + federation, + data: undefined, + documentLoader: mockDocumentLoader, + contextLoader: documentLoader, + }); + + const signedOffer = await signObject( + new vocab.Offer({ + actor: new URL("https://example.com/person2"), + }), + ed25519PrivateKey, + ed25519Multikey.id!, + ); + assert(await ctx.routeActivity(null, signedOffer)); + assertEquals(enqueued.length, 1); + + const enqueuedMetrics = recorder.getMeasurements( + "fedify.queue.task.enqueued", + ); + assertEquals(enqueuedMetrics.length, 1); + assertEquals( + enqueuedMetrics[0].attributes["fedify.queue.role"], + "inbox", + ); + assertEquals( + enqueuedMetrics[0].attributes["fedify.queue.task.attempt"], + 0, + ); + }, +}); + test("ContextImpl.getCollectionUri()", () => { const federation = new FederationImpl({ kv: new MemoryKvStore() }); const base = "https://example.com"; diff --git a/packages/fedify/src/federation/middleware.ts b/packages/fedify/src/federation/middleware.ts index f29e3a84c..b75237bb8 100644 --- a/packages/fedify/src/federation/middleware.ts +++ b/packages/fedify/src/federation/middleware.ts @@ -107,6 +107,10 @@ import { getDurationMs, getFederationMetrics, getRemoteHost, + isAbortError, + type QueueTaskCommonAttributes, + type QueueTaskResult, + recordOutboxEnqueue, } from "./metrics.ts"; import type { MessageQueue } from "./mq.ts"; import { acceptsJsonLd } from "./negotiation.ts"; @@ -488,8 +492,14 @@ export class FederationImpl context.active(), message.traceContext, ); + const meter = getFederationMetrics(this.meterProvider); return withContext({ messageId: message.id }, async () => { if (message.type === "fanout") { + const common: QueueTaskCommonAttributes = { + role: "fanout", + queue: this.fanoutQueue, + activityType: message.activityType, + }; await tracer.startActiveSpan( "activitypub.fanout", { @@ -510,15 +520,29 @@ export class FederationImpl message.activityId, ); } + meter.recordQueueTaskStarted(common); + meter.incrementQueueTaskInFlight(common); + const startedAt = performance.now(); + let outcome: QueueTaskResult = "completed"; try { await this.#listenFanoutMessage(contextData, message); } catch (e) { - span.setStatus({ - code: SpanStatusCode.ERROR, - message: String(e), - }); + const aborted = isAbortError(e); + outcome = aborted ? "aborted" : "failed"; + if (!aborted) { + span.setStatus({ + code: SpanStatusCode.ERROR, + message: String(e), + }); + } throw e; } finally { + meter.recordQueueTaskOutcome( + common, + outcome, + getDurationMs(startedAt), + ); + meter.decrementQueueTaskInFlight(common); span.end(); } }, @@ -526,6 +550,11 @@ export class FederationImpl }, ); } else if (message.type === "outbox") { + const common: QueueTaskCommonAttributes = { + role: "outbox", + queue: this.outboxQueue, + activityType: message.activityType, + }; await tracer.startActiveSpan( "activitypub.outbox", { @@ -547,15 +576,29 @@ export class FederationImpl message.activityId, ); } + meter.recordQueueTaskStarted(common); + meter.incrementQueueTaskInFlight(common); + const startedAt = performance.now(); + let outcome: QueueTaskResult = "completed"; try { await this.#listenOutboxMessage(contextData, message, span); } catch (e) { - span.setStatus({ - code: SpanStatusCode.ERROR, - message: String(e), - }); + const aborted = isAbortError(e); + outcome = aborted ? "aborted" : "failed"; + if (!aborted) { + span.setStatus({ + code: SpanStatusCode.ERROR, + message: String(e), + }); + } throw e; } finally { + meter.recordQueueTaskOutcome( + common, + outcome, + getDurationMs(startedAt), + ); + meter.decrementQueueTaskInFlight(common); span.end(); } }, @@ -563,6 +606,10 @@ export class FederationImpl }, ); } else if (message.type === "inbox") { + const common: QueueTaskCommonAttributes = { + role: "inbox", + queue: this.inboxQueue, + }; await tracer.startActiveSpan( "activitypub.inbox", { @@ -577,15 +624,36 @@ export class FederationImpl return await withContext( { traceId: spanCtx.traceId, spanId: spanCtx.spanId }, async () => { + meter.recordQueueTaskStarted(common); + meter.incrementQueueTaskInFlight(common); + const startedAt = performance.now(); + let outcome: QueueTaskResult = "completed"; try { - await this.#listenInboxMessage(contextData, message, span); + await this.#listenInboxMessage( + contextData, + message, + span, + (activityType) => { + common.activityType = activityType; + }, + ); } catch (e) { - span.setStatus({ - code: SpanStatusCode.ERROR, - message: String(e), - }); + const aborted = isAbortError(e); + outcome = aborted ? "aborted" : "failed"; + if (!aborted) { + span.setStatus({ + code: SpanStatusCode.ERROR, + message: String(e), + }); + } throw e; } finally { + meter.recordQueueTaskOutcome( + common, + outcome, + getDurationMs(startedAt), + ); + meter.decrementQueueTaskInFlight(common); span.end(); } }, @@ -809,17 +877,29 @@ export class FederationImpl "#{attempt}); retry...:\n{error}", { ...logData, error }, ); - await this.outboxQueue?.enqueue( - { - ...message, - attempt: message.attempt + 1, - } satisfies OutboxMessage, - { - delay: Temporal.Duration.compare(delay, { seconds: 0 }) < 0 - ? Temporal.Duration.from({ seconds: 0 }) - : delay, - }, - ); + const retryMessage = { + ...message, + attempt: message.attempt + 1, + } satisfies OutboxMessage; + const { outboxQueue } = this; + if (outboxQueue != null) { + await outboxQueue.enqueue( + retryMessage, + { + delay: Temporal.Duration.compare(delay, { seconds: 0 }) < 0 + ? Temporal.Duration.from({ seconds: 0 }) + : delay, + }, + ); + getFederationMetrics(this.meterProvider).recordQueueTaskEnqueued( + { + role: "outbox", + queue: outboxQueue, + activityType: retryMessage.activityType, + }, + retryMessage.attempt, + ); + } } else { logger.error( "Failed to send activity {activityId} to {inbox} after {attempt} " + @@ -839,6 +919,7 @@ export class FederationImpl ctxData: TContextData, message: InboxMessage, span: Span, + onActivityType?: (activityType: string) => void, ): Promise { const logger = getLogger(["fedify", "federation", "inbox"]); const baseUrl = new URL(message.baseUrl); @@ -860,7 +941,9 @@ export class FederationImpl } } const activity = await Activity.fromJsonLd(message.activity, context); - span.setAttribute("activitypub.activity.type", getTypeId(activity).href); + const activityType = getTypeId(activity).href; + span.setAttribute("activitypub.activity.type", activityType); + onActivityType?.(activityType); if (activity.id != null) { span.setAttribute("activitypub.activity.id", activity.id.href); } @@ -897,7 +980,7 @@ export class FederationImpl ); span.setStatus({ code: SpanStatusCode.ERROR, - message: `Unsupported activity type: ${getTypeId(activity).href}`, + message: `Unsupported activity type: ${activityType}`, }); span.end(); return; @@ -905,7 +988,6 @@ export class FederationImpl const { class: cls, listener } = dispatched; span.updateName(`activitypub.dispatch_inbox_listener ${cls.name}`); try { - const activityType = getTypeId(activity).href; const started = performance.now(); try { await listener( @@ -976,17 +1058,29 @@ export class FederationImpl recipient: message.identifier, }, ); - await this.inboxQueue?.enqueue( - { - ...message, - attempt: message.attempt + 1, - } satisfies InboxMessage, - { - delay: Temporal.Duration.compare(delay, { seconds: 0 }) < 0 - ? Temporal.Duration.from({ seconds: 0 }) - : delay, - }, - ); + const retryMessage = { + ...message, + attempt: message.attempt + 1, + } satisfies InboxMessage; + const { inboxQueue } = this; + if (inboxQueue != null) { + await inboxQueue.enqueue( + retryMessage, + { + delay: Temporal.Duration.compare(delay, { seconds: 0 }) < 0 + ? Temporal.Duration.from({ seconds: 0 }) + : delay, + }, + ); + getFederationMetrics(this.meterProvider).recordQueueTaskEnqueued( + { + role: "inbox", + queue: inboxQueue, + activityType, + }, + retryMessage.attempt, + ); + } } else { logger.error( "Failed to process the incoming activity {activityId} after " + @@ -1300,12 +1394,17 @@ export class FederationImpl messages.push({ message, orderingKey: messageOrderingKey }); } const { outboxQueue } = this; - if (outboxQueue.enqueueMany == null) { - const promises: Promise[] = messages.map((m) => - outboxQueue.enqueue(m.message, { orderingKey: m.orderingKey }) + // enqueueMany does not support per-message orderingKey, so fall back to + // individual enqueues whenever orderingKey is specified or the backend + // does not implement enqueueMany. + if (outboxQueue.enqueueMany == null || orderingKey != null) { + const promises: PromiseSettledResult[] = await Promise.allSettled( + messages.map(async (m) => { + await outboxQueue.enqueue(m.message, { orderingKey: m.orderingKey }); + recordOutboxEnqueue(this.meterProvider, outboxQueue, m.message); + }), ); - const results = await Promise.allSettled(promises); - const errors = results + const errors = promises .filter((r) => r.status === "rejected") .map((r) => (r as PromiseRejectedResult).reason); if (errors.length > 0) { @@ -1322,39 +1421,17 @@ export class FederationImpl throw errors[0]; } } else { - // Note: enqueueMany does not support per-message orderingKey, - // so we fall back to individual enqueues when orderingKey is specified - if (orderingKey != null) { - const promises: Promise[] = messages.map((m) => - outboxQueue.enqueue(m.message, { orderingKey: m.orderingKey }) + try { + await outboxQueue.enqueueMany(messages.map((m) => m.message)); + } catch (error) { + logger.error( + "Failed to enqueue activity {activityId} to send later: {error}", + { activityId: activity.id!.href, error }, ); - const results = await Promise.allSettled(promises); - const errors = results - .filter((r) => r.status === "rejected") - .map((r) => (r as PromiseRejectedResult).reason); - if (errors.length > 0) { - logger.error( - "Failed to enqueue activity {activityId} to send later: {errors}", - { activityId: activity.id!.href, errors }, - ); - if (errors.length > 1) { - throw new AggregateError( - errors, - `Failed to enqueue activity ${activityId} to send later.`, - ); - } - throw errors[0]; - } - } else { - try { - await outboxQueue.enqueueMany(messages.map((m) => m.message)); - } catch (error) { - logger.error( - "Failed to enqueue activity {activityId} to send later: {error}", - { activityId: activity.id!.href, error }, - ); - throw error; - } + throw error; + } + for (const m of messages) { + recordOutboxEnqueue(this.meterProvider, outboxQueue, m.message); } } } @@ -2640,6 +2717,14 @@ export class ContextImpl implements Context { message, { orderingKey: options.orderingKey }, ); + getFederationMetrics(this.federation.meterProvider).recordQueueTaskEnqueued( + { + role: "fanout", + queue: this.federation.fanoutQueue, + activityType: message.activityType, + }, + 0, + ); return true; } @@ -2853,6 +2938,7 @@ export class ContextImpl implements Context { kvPrefixes: this.federation.kvPrefixes, queue: this.federation.inboxQueue, span, + meterProvider: this.federation.meterProvider, tracerProvider: options.tracerProvider ?? this.tracerProvider, idempotencyStrategy: this.federation.idempotencyStrategy, }); @@ -3272,11 +3358,17 @@ async function forwardActivityInternal( } const { outboxQueue } = ctx.federation; if (outboxQueue.enqueueMany == null || orderingKey != null) { - const promises: Promise[] = messages.map((m) => - outboxQueue.enqueue(m.message, { orderingKey: m.orderingKey }) + const promises: PromiseSettledResult[] = await Promise.allSettled( + messages.map(async (m) => { + await outboxQueue.enqueue(m.message, { orderingKey: m.orderingKey }); + recordOutboxEnqueue( + ctx.federation.meterProvider, + outboxQueue, + m.message, + ); + }), ); - const results = await Promise.allSettled(promises); - const errors: unknown[] = results + const errors: unknown[] = promises .filter((r) => r.status === "rejected") .map((r) => (r as PromiseRejectedResult).reason); if (errors.length > 0) { @@ -3302,6 +3394,13 @@ async function forwardActivityInternal( ); throw error; } + for (const m of messages) { + recordOutboxEnqueue( + ctx.federation.meterProvider, + outboxQueue, + m.message, + ); + } } return true; }