From 934be00a0d5e0ac10cdafcef0c35c12c5dd54fcd Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 11 Jun 2026 18:19:43 +0100 Subject: [PATCH 1/2] fix(webapp): deliver realtime changes with current content when the read replica lags The realtime feed hydrates change rows from the read replica, and that read can race the replica's apply of the very write that triggered it. Subscribers then receive the previous change's content, and an isolated final change is only corrected by the backstop poll. Publishers now stamp change records with the committed row's updatedAt (taken from writes they already perform, no extra queries), the router waits out the measured replica lag before hydrating, and a tripwire retries stale reads, feeding observations back into the lag estimate. Exhausted retries deliver anyway (liveness over freshness) while echo re-hydrates emit the fresh row through the normal diff once the replica catches up. Also: metadata updates that write nothing no longer publish a change record, and buffered parent/root metadata operations now publish when the flusher writes them. --- .../realtime-replica-read-consistency.md | 6 + apps/webapp/app/env.server.ts | 22 ++ .../app/routes/api.v1.runs.$runId.metadata.ts | 34 ++- .../app/routes/api.v1.runs.$runId.tags.ts | 6 +- .../metadata/updateMetadata.server.ts | 80 +++++- .../metadata/updateMetadataInstance.server.ts | 12 + .../realtime/envChangeRouter.server.ts | 172 +++++++++++- .../nativeRealtimeClientInstance.server.ts | 58 +++- .../realtime/replicaLagEstimator.server.ts | 257 ++++++++++++++++++ .../webapp/app/v3/runEngineHandlers.server.ts | 20 +- .../test/realtime/envChangeRouter.test.ts | 187 +++++++++++++ .../test/realtime/replicaLagEstimator.test.ts | 164 +++++++++++ 12 files changed, 987 insertions(+), 31 deletions(-) create mode 100644 .server-changes/realtime-replica-read-consistency.md create mode 100644 apps/webapp/app/services/realtime/replicaLagEstimator.server.ts create mode 100644 apps/webapp/test/realtime/replicaLagEstimator.test.ts diff --git a/.server-changes/realtime-replica-read-consistency.md b/.server-changes/realtime-replica-read-consistency.md new file mode 100644 index 00000000000..d23c73e682d --- /dev/null +++ b/.server-changes/realtime-replica-read-consistency.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: fix +--- + +Realtime feed reads now wait out measured read-replica lag and retry stale reads, so subscribers receive each change's current content instead of trailing one change behind when a read replica races the write. diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 79634926835..d9c97711940 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -338,6 +338,28 @@ const EnvironmentSchema = z // TTL/size of the per-org realtimeBackend flag cache used to pick the serving backend. REALTIME_BACKEND_FLAG_CACHE_TTL_MS: z.coerce.number().int().default(30_000), REALTIME_BACKEND_FLAG_CACHE_MAX_ENTRIES: z.coerce.number().int().default(50_000), + // "1" enables the read-your-writes gate: wake hydrates wait out the measured replica lag + // (anchored to the change record's updatedAtMs) and stale reads are retried. + REALTIME_BACKEND_NATIVE_REPLICA_LAG_GATE_ENABLED: z.string().default("1"), + // Reader-side lag probe cadence while the router is active; probing pauses when idle. + REALTIME_BACKEND_NATIVE_REPLICA_LAG_SAMPLE_INTERVAL_MS: z.coerce.number().int().default(250), + REALTIME_BACKEND_NATIVE_REPLICA_LAG_IDLE_AFTER_MS: z.coerce.number().int().default(30_000), + // The lag estimate is the max sample inside this window (spikes widen it immediately). + REALTIME_BACKEND_NATIVE_REPLICA_LAG_WINDOW_MS: z.coerce.number().int().default(5_000), + // Estimate before the first sample lands (and the floor when probing is unavailable). + REALTIME_BACKEND_NATIVE_REPLICA_LAG_DEFAULT_MS: z.coerce.number().int().default(30), + // Safety margin (clock skew + scheduling) added on top of the lag estimate. + REALTIME_BACKEND_NATIVE_REPLICA_LAG_MARGIN_MS: z.coerce.number().int().default(10), + // Hard cap on any single gate delay — a sick replica degrades freshness, never liveness. + REALTIME_BACKEND_NATIVE_REPLICA_LAG_MAX_DELAY_MS: z.coerce.number().int().default(1_000), + // Re-hydrate attempts for rows the tripwire still finds stale after the delay. + REALTIME_BACKEND_NATIVE_STALE_HYDRATE_RETRIES: z.coerce.number().int().default(3), + // How long a tripwire-observed staleness floors the lag estimate (vanilla-PG replicas + // can't measure mid-apply lag, so observations carry the estimate between races). + REALTIME_BACKEND_NATIVE_REPLICA_LAG_OBSERVED_FLOOR_TTL_MS: z.coerce + .number() + .int() + .default(60_000), PUBSUB_REDIS_HOST: z .string() diff --git a/apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts b/apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts index d3d92d1fe5d..3f22929aca9 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts @@ -100,7 +100,20 @@ export async function routeOperationsToRun( const [error, result] = await tryCatch( updateMetadataService.call(targetRunId, { operations }, env) ); - if (!error && result !== undefined) return; + if (!error && result !== undefined) { + // The parent/root run changed too — wake its live feeds (only when something was + // actually written here; buffered writes publish from the flusher). + if (result.updatedAtMs !== undefined) { + publishChangeRecord({ + runId: result.runId, + envId: env.id, + tags: result.runTags, + batchId: result.batchId, + updatedAtMs: result.updatedAtMs, + }); + } + return; + } if (error) { // PG threw — auxiliary op, stay best-effort and don't surface this @@ -186,13 +199,18 @@ const { action } = createActionApiRoute( } if (pgResult) { // Reflect metadata.set() on a live feed before the next lifecycle event. Publish the - // internal id (the router keys single-run feeds by it, not the friendly id from the URL). - publishChangeRecord({ - runId: pgResult.runId, - envId: env.id, - tags: pgResult.runTags, - batchId: pgResult.batchId, - }); + // internal id (the router keys single-run feeds by it, not the friendly id from the + // URL) with the committed updatedAt as the read-your-writes watermark. No write + // (no-op body, or ops buffered for the flusher) means nothing to announce here. + if (pgResult.updatedAtMs !== undefined) { + publishChangeRecord({ + runId: pgResult.runId, + envId: env.id, + tags: pgResult.runTags, + batchId: pgResult.batchId, + updatedAtMs: pgResult.updatedAtMs, + }); + } return json({ metadata: pgResult.metadata }, { status: 200 }); } diff --git a/apps/webapp/app/routes/api.v1.runs.$runId.tags.ts b/apps/webapp/app/routes/api.v1.runs.$runId.tags.ts index e98d3f35823..f984562eb3d 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runId.tags.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runId.tags.ts @@ -84,20 +84,22 @@ export async function action({ request, params }: ActionFunctionArgs) { if (newTags.length === 0) { return json({ message: "No new tags to add" }, { status: 200 }); } - await prisma.taskRun.update({ + const updated = await prisma.taskRun.update({ where: { id: taskRun.id, runtimeEnvironmentId: env.id, }, data: { runTags: { push: newTags } }, + select: { updatedAt: true }, }); // Publish a run-changed record with the NEW tag set so tag feeds reindex - // (no-op unless enabled). + // (no-op unless enabled). updatedAt is the read-your-writes watermark. publishChangeRecord({ runId: taskRun.id, envId: env.id, tags: existing.concat(newTags), batchId: taskRun.batchId, + updatedAtMs: updated.updatedAt.getTime(), }); return json({ message: `Successfully set ${newTags.length} new tags.` }, { status: 200 }); }, diff --git a/apps/webapp/app/services/metadata/updateMetadata.server.ts b/apps/webapp/app/services/metadata/updateMetadata.server.ts index 3948da046f9..7b87034a301 100644 --- a/apps/webapp/app/services/metadata/updateMetadata.server.ts +++ b/apps/webapp/app/services/metadata/updateMetadata.server.ts @@ -30,6 +30,16 @@ export type UpdateMetadataServiceOptions = { maximumSize?: number; logger?: Logger; logLevel?: LogLevel; + /** Called after the batched flusher writes a run's buffered operations, with everything + * a realtime change record needs — buffered (parent/root) updates otherwise never wake + * live feeds. */ + onRunFlushed?: (run: { + runId: string; + environmentId: string; + tags: string[]; + batchId: string | null; + updatedAtMs: number; + }) => void; // Testing hooks onBeforeUpdate?: (runId: string) => Promise; onAfterRead?: (runId: string, metadataVersion: number) => Promise; @@ -172,12 +182,20 @@ export class UpdateMetadataService { operations: BufferedRunMetadataChangeOperation[] ) => { return Effect.gen(this, function* (_) { - // Fetch current run + // Fetch current run (+ the realtime membership keys, so a flush can publish) const run = yield* _( Effect.tryPromise(() => this._prisma.taskRun.findFirst({ where: { id: runId }, - select: { id: true, metadata: true, metadataType: true, metadataVersion: true }, + select: { + id: true, + metadata: true, + metadataType: true, + metadataVersion: true, + runtimeEnvironmentId: true, + runTags: true, + batchId: true, + }, }) ) ); @@ -237,6 +255,9 @@ export class UpdateMetadataService { yield* _(Effect.tryPromise(() => this.options.onBeforeUpdate!(runId))); } + // Stamp updatedAt explicitly so the realtime publish can carry the exact committed + // value without a follow-up read (updateMany can't RETURNING). + const writeTime = new Date(); const result = yield* _( Effect.tryPromise(() => this._prisma.taskRun.updateMany({ @@ -247,6 +268,7 @@ export class UpdateMetadataService { data: { metadata: newMetadataPacket.data, metadataVersion: { increment: 1 }, + updatedAt: writeTime, }, }) ) @@ -262,6 +284,16 @@ export class UpdateMetadataService { return yield* _(Effect.fail(new Error("Optimistic lock failed"))); } + yield* Effect.sync(() => { + this.options.onRunFlushed?.({ + runId, + environmentId: run.runtimeEnvironmentId, + tags: run.runTags, + batchId: run.batchId, + updatedAtMs: writeTime.getTime(), + }); + }); + return result; }); }; @@ -346,7 +378,7 @@ export class UpdateMetadataService { this.#ingestRunOperations(taskRun.rootTaskRun?.id ?? taskRun.id, body.rootOperations); } - const newMetadata = await this.#updateRunMetadata({ + const result = await this.#updateRunMetadata({ runId: taskRun.id, body, existingMetadata: { @@ -356,11 +388,14 @@ export class UpdateMetadataService { }); return { - metadata: newMetadata, + metadata: result?.metadata, // Internal id + membership keys, so callers can publish full realtime records the router routes by index. runId: taskRun.id, batchId: taskRun.batchId, runTags: taskRun.runTags, + // The committed row's updatedAt — the realtime watermark. Undefined when nothing was + // written here (no-op, or buffered for the flusher, which publishes itself). + updatedAtMs: result?.updatedAtMs, }; } @@ -372,7 +407,7 @@ export class UpdateMetadataService { runId: string; body: UpdateMetadataRequestBody; existingMetadata: IOPacket; - }) { + }): Promise<{ metadata: Record | undefined; updatedAtMs?: number }> { if (Array.isArray(body.operations)) { return this.#updateRunMetadataWithOperations(runId, body.operations); } else { @@ -380,7 +415,10 @@ export class UpdateMetadataService { } } - async #updateRunMetadataWithOperations(runId: string, operations: RunMetadataChangeOperation[]) { + async #updateRunMetadataWithOperations( + runId: string, + operations: RunMetadataChangeOperation[] + ): Promise<{ metadata: Record | undefined; updatedAtMs?: number }> { const MAX_RETRIES = 3; let attempts = 0; @@ -408,9 +446,9 @@ export class UpdateMetadataService { // Apply operations to the current metadata const applyResults = applyMetadataOperations(currentMetadata, operations); - // If no operations were applied, return the current metadata + // If no operations were applied, return the current metadata (nothing written) if (applyResults.unappliedOperations.length === operations.length) { - return currentMetadata; + return { metadata: currentMetadata }; } const newMetadataPacket = handleMetadataPacket( @@ -428,7 +466,9 @@ export class UpdateMetadataService { await this.options.onBeforeUpdate(runId); } - // Update with optimistic locking + // Update with optimistic locking; updatedAt stamped explicitly so the caller can + // publish the exact committed watermark without a follow-up read. + const writeTime = new Date(); const result = await this._prisma.taskRun.updateMany({ where: { id: runId, @@ -440,6 +480,7 @@ export class UpdateMetadataService { metadataVersion: { increment: 1, }, + updatedAt: writeTime, }, }); @@ -454,9 +495,10 @@ export class UpdateMetadataService { } // If this was our last attempt, buffer the operations and return optimistically + // (no watermark — the flusher writes later and publishes itself). if (attempts === MAX_RETRIES) { this.#ingestRunOperations(runId, operations); - return applyResults.newMetadata; + return { metadata: applyResults.newMetadata }; } // Otherwise sleep and try again @@ -474,8 +516,10 @@ export class UpdateMetadataService { } // Success! Return the new metadata - return applyResults.newMetadata; + return { metadata: applyResults.newMetadata, updatedAtMs: writeTime.getTime() }; } + + return { metadata: undefined }; } // Checks to see if a run is updatable @@ -493,7 +537,7 @@ export class UpdateMetadataService { runId: string, body: UpdateMetadataRequestBody, existingMetadata: IOPacket - ) { + ): Promise<{ metadata: Record | undefined; updatedAtMs?: number }> { const metadataPacket = handleMetadataPacket( body.metadata, "application/json", @@ -501,9 +545,11 @@ export class UpdateMetadataService { ); if (!metadataPacket) { - return {}; + return { metadata: {} }; } + let updatedAtMs: number | undefined; + if ( metadataPacket.data !== "{}" || (existingMetadata.data && metadataPacket.data !== existingMetadata.data) @@ -515,7 +561,9 @@ export class UpdateMetadataService { }); } - // Update the metadata without version check + // Update the metadata without version check; updatedAt stamped explicitly so the + // caller can publish the exact committed watermark. + const writeTime = new Date(); await this._prisma.taskRun.update({ where: { id: runId, @@ -526,12 +574,14 @@ export class UpdateMetadataService { metadataVersion: { increment: 1, }, + updatedAt: writeTime, }, }); + updatedAtMs = writeTime.getTime(); } const newMetadata = await parsePacket(metadataPacket); - return newMetadata; + return { metadata: newMetadata, updatedAtMs }; } #ingestRunOperations(runId: string, operations: RunMetadataChangeOperation[]) { diff --git a/apps/webapp/app/services/metadata/updateMetadataInstance.server.ts b/apps/webapp/app/services/metadata/updateMetadataInstance.server.ts index fe7c3b63f5f..9f1818e5ed3 100644 --- a/apps/webapp/app/services/metadata/updateMetadataInstance.server.ts +++ b/apps/webapp/app/services/metadata/updateMetadataInstance.server.ts @@ -2,6 +2,7 @@ import { singleton } from "~/utils/singleton"; import { env } from "~/env.server"; import { UpdateMetadataService } from "./updateMetadata.server"; import { prisma } from "~/db.server"; +import { publishChangeRecord } from "~/services/realtime/runChangeNotifierInstance.server"; export const updateMetadataService = singleton( "update-metadata-service", @@ -13,5 +14,16 @@ export const updateMetadataService = singleton( flushLoggingEnabled: env.BATCH_METADATA_OPERATIONS_FLUSH_LOGGING_ENABLED === "1", maximumSize: env.TASK_RUN_METADATA_MAXIMUM_SIZE, logLevel: env.BATCH_METADATA_OPERATIONS_FLUSH_LOGGING_ENABLED === "1" ? "debug" : "info", + // Buffered (parent/root) operations land via the flusher, not the caller's request — + // publish here so those changes wake live feeds too (no-op when the backend is off). + onRunFlushed: (run) => { + publishChangeRecord({ + runId: run.runId, + envId: run.environmentId, + tags: run.tags, + batchId: run.batchId, + updatedAtMs: run.updatedAtMs, + }); + }, }) ); diff --git a/apps/webapp/app/services/realtime/envChangeRouter.server.ts b/apps/webapp/app/services/realtime/envChangeRouter.server.ts index 587f0e55c5a..6c5969e02c6 100644 --- a/apps/webapp/app/services/realtime/envChangeRouter.server.ts +++ b/apps/webapp/app/services/realtime/envChangeRouter.server.ts @@ -51,6 +51,25 @@ export type EnvChangeRouterOptions = { /** Observability: a buffered record was evicted. `cap` evictions mean the env churns more * runs inside the window than the buffer holds (the replay guarantee is degrading). */ onReplayEviction?: (reason: "cap" | "window") => void; + /** Read-your-writes gate over the replica: delays wake-path hydrates until the replica + * should have applied the change (record.updatedAtMs + lag + margin), and re-hydrates + * rows the tripwire still finds stale. Omit to hydrate immediately (legacy behavior). */ + replicaLag?: ReplicaLagGate; +}; + +export type ReplicaLagGate = { + /** Current replica-lag estimate (ms). */ + getLagMs(): number; + /** Feedback: a hydrate provably read at least this far behind the primary. */ + noteObservedLagMs(lagMs: number): void; + /** Safety margin added on top of the estimate (clock skew + scheduling). */ + marginMs: number; + /** Hard cap on any single gate delay — a sick replica degrades freshness, never liveness. */ + maxDelayMs: number; + /** Re-hydrate attempts for rows that still read stale after the delay. */ + staleRetries: number; + /** Observability: stale rows recovered by a retry, or delivered stale after exhausting them. */ + onStaleHydrate?: (outcome: "recovered" | "gave_up", runCount: number) => void; }; const DEFAULT_REPLAY_WINDOW_MS = 2_000; @@ -98,6 +117,13 @@ type EnvState = { lingerTimer?: ReturnType; }; +function sleepMs(ms: number): Promise { + return new Promise((resolve) => { + const timer = setTimeout(resolve, ms); + timer.unref?.(); + }); +} + function addToIndex(index: Map>, key: string, feed: Feed) { let set = index.get(key); if (!set) { @@ -322,6 +348,24 @@ export class EnvChangeRouter { } } + /** How long to wait before hydrating so the replica has applied every change in the + * batch: each record is safe at updatedAtMs + lag + margin (records without a watermark + * anchor at now, degrading to a plain lag-sized delay). Capped — see ReplicaLagGate. */ + #gateDelayMs(records: ChangeRecord[]): number { + const gate = this.options.replicaLag; + if (!gate || records.length === 0) { + return 0; + } + const now = Date.now(); + const lagMs = gate.getLagMs(); + let safeAtMs = 0; + for (const record of records) { + const anchorMs = record.updatedAtMs ?? now; + safeAtMs = Math.max(safeAtMs, anchorMs + lagMs + gate.marginMs); + } + return Math.max(0, Math.min(safeAtMs - now, gate.maxDelayMs)); + } + /** Deliver buffered records newer than the feed's cursor through the normal * hydrate -> serialize -> settle pipeline. Already-seen rows diff to nothing downstream. */ async #replayRecent(environmentId: string, env: EnvState, feed: Feed) { @@ -329,15 +373,28 @@ export class EnvChangeRouter { feed.replayCursorMs = Date.now(); const runIds: string[] = []; + const candidateRecords: ChangeRecord[] = []; for (const [runId, entry] of env.recent) { if (entry.receivedAtMs > cursor && this.#recordMatchesFeed(entry.record, feed)) { runIds.push(runId); + candidateRecords.push(entry.record); } } if (runIds.length === 0 || !feed.resolve) { return; } + // Replayed records are usually past the lag window already (delay computes to 0); a + // just-buffered one gets the same read-your-writes gate as the live path. No tripwire + // here — a stale replay diffs to a re-emission on the next wake or backstop. + const replayDelayMs = this.#gateDelayMs(candidateRecords); + if (replayDelayMs > 0) { + await sleepMs(replayDelayMs); + if (!feed.resolve) { + return; + } + } + const hydrated = await this.options.hydrator.hydrateByIds( environmentId, runIds, @@ -399,7 +456,17 @@ export class EnvChangeRouter { } } - async #onBatch(environmentId: string, env: EnvState, records: ChangeRecord[]) { + async #onBatch(environmentId: string, env: EnvState, records: ChangeRecord[], attempt = 0) { + // 0. Read-your-writes gate: wait out the replica's apply lag before hydrating, so the + // rows we read contain the changes the records announce. Retry attempts were + // scheduled with their own delay, so only the first pass gates here. + if (attempt === 0) { + const delayMs = this.#gateDelayMs(records); + if (delayMs > 0) { + await sleepMs(delayMs); + } + } + // 1. Route each record to the held feeds it matches; collect matched runIds per feed. const matchedRunIdsByFeed = new Map>(); const addMatch = (feed: Feed, runId: string) => { @@ -485,6 +552,63 @@ export class EnvChangeRouter { }) ); + // 3.5 Stale tripwire: a watermarked record whose hydrated row is older (or missing — + // the insert race) read a replica that hadn't applied the change. Withhold those + // rows and re-hydrate shortly. Exhausting the retry budget delivers what we have + // (liveness over freshness) — but a stale emission advances the feed's cursor, so + // it ALSO schedules echo passes past the gate: re-hydrates flowing through normal + // emission, where the working-set diff drops unchanged rows and emits the fresh + // version once the replica catches up. The backstop stays the terminal net. + // Each detection feeds the lag estimator. + const gate = this.options.replicaLag; + const isEchoPass = gate !== undefined && attempt > gate.staleRetries; + const staleRunIds = gate + ? this.#detectStaleRuns(records, runIdsByColumnSig, hydratedByColumnSig) + : new Set(); + if (attempt > 0 && !isEchoPass) { + const recovered = new Set(records.map((r) => r.runId)).size - staleRunIds.size; + if (recovered > 0) { + gate?.onStaleHydrate?.("recovered", recovered); + } + } + if (staleRunIds.size > 0 && gate) { + const staleRecords = records.filter((record) => staleRunIds.has(record.runId)); + // Re-buffer the withheld records so a feed that re-arms between now and the next + // pass replays them instead of waiting for its backstop. + this.#bufferRecent(env, staleRecords); + if (attempt >= gate.staleRetries) { + // Budget exhausted: deliver the stale rows below (liveness) — but a stale emission + // advances the feed's cursor, so keep echoing re-hydrates through normal emission + // (the working-set diff drops unchanged rows, emits the fresh version when the + // replica catches up). Echoes stop once the change ages past the horizon; deeper + // outages are the backstop's job. + if (attempt === gate.staleRetries) { + gate.onStaleHydrate?.("gave_up", staleRunIds.size); + } + staleRunIds.clear(); + } + const echoHorizonMs = gate.maxDelayMs * 10; + const newestWatermarkMs = Math.max( + ...staleRecords.map((record) => record.updatedAtMs ?? 0) + ); + const withinEchoHorizon = Date.now() - newestWatermarkMs < echoHorizonMs; + if (attempt < gate.staleRetries || withinEchoHorizon) { + const retryDelayMs = Math.max( + 25, + Math.min(gate.getLagMs() + gate.marginMs, gate.maxDelayMs) + ); + const timer = setTimeout(() => { + this.#onBatch(environmentId, env, staleRecords, attempt + 1).catch((error) => { + logger.error("[envChangeRouter] failed to re-hydrate stale rows", { + environmentId, + error, + }); + }); + }, retryDelayMs); + timer.unref?.(); + } + } + // 4. Assemble each feed's matched rows (post-filtering tag feeds against the // authoritative hydrated row) and resolve its pending wait. for (const [feed, runIds] of matchedRunIdsByFeed) { @@ -496,6 +620,9 @@ export class EnvChangeRouter { const rows: MatchedRow[] = []; for (const runId of runIds) { + if (staleRunIds.has(runId)) { + continue; // withheld; the scheduled re-hydrate delivers the fresh version + } const matched = hydrated.get(runId); if (!matched) continue; // run not found / left the table if (feed.filter.kind === "tag" && !this.#tagRowMatches(matched.row, feed.filter)) { @@ -512,6 +639,49 @@ export class EnvChangeRouter { } } + /** Runs whose hydrated row is provably behind its record's watermark (stale content), + * or absent entirely despite a watermark (the insert hasn't applied). Records without + * `updatedAtMs` can't be judged and always pass. */ + #detectStaleRuns( + records: ChangeRecord[], + runIdsByColumnSig: Map }>, + hydratedByColumnSig: Map> + ): Set { + const gate = this.options.replicaLag; + const stale = new Set(); + if (!gate) { + return stale; + } + const expectedByRunId = new Map(); + for (const record of records) { + if (record.updatedAtMs !== undefined) { + const existing = expectedByRunId.get(record.runId); + if (existing === undefined || record.updatedAtMs > existing) { + expectedByRunId.set(record.runId, record.updatedAtMs); + } + } + } + if (expectedByRunId.size === 0) { + return stale; + } + const now = Date.now(); + for (const [columnSig, group] of runIdsByColumnSig) { + const hydrated = hydratedByColumnSig.get(columnSig); + for (const runId of group.runIds) { + const expected = expectedByRunId.get(runId); + if (expected === undefined || stale.has(runId)) { + continue; + } + const matched = hydrated?.get(runId); + if (!matched || matched.row.updatedAt.getTime() < expected) { + stale.add(runId); + gate.noteObservedLagMs(now - expected); + } + } + } + return stale; + } + /** Authoritative re-check for tag feeds: the hydrated row carries ALL the filter's tags * (Electric's `runTags @> ARRAY[...]` semantics) and its createdAt is within the window. */ #tagRowMatches(row: RealtimeRunRow, filter: Extract): boolean { diff --git a/apps/webapp/app/services/realtime/nativeRealtimeClientInstance.server.ts b/apps/webapp/app/services/realtime/nativeRealtimeClientInstance.server.ts index c41149f0cc6..012c28c08fc 100644 --- a/apps/webapp/app/services/realtime/nativeRealtimeClientInstance.server.ts +++ b/apps/webapp/app/services/realtime/nativeRealtimeClientInstance.server.ts @@ -5,11 +5,12 @@ import { singleton } from "~/utils/singleton"; import { getCachedLimit } from "../platform.v3.server"; import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactoryInstance.server"; import { ClickHouseRunListResolver } from "./clickHouseRunListResolver.server"; -import { EnvChangeRouter } from "./envChangeRouter.server"; +import { EnvChangeRouter, type EnvChangeSource } from "./envChangeRouter.server"; import { NativeRealtimeClient } from "./nativeRealtimeClient.server"; import { RealtimeConcurrencyLimiter } from "./realtimeConcurrencyLimiter.server"; import { getRunChangeNotifier } from "./runChangeNotifierInstance.server"; import { RedisReplayCursorStore } from "./replayCursorStore.server"; +import { createPostgresReplicaLagSource, ReplicaLagEstimator } from "./replicaLagEstimator.server"; import { RunHydrator } from "./runReader.server"; // Process-singleton wiring for the native realtime client; only constructed when a @@ -83,6 +84,11 @@ function initializeNativeRealtimeClient(): NativeRealtimeClient { "Shared replay-cursor store operations by outcome. Errors degrade hops to cold resolves (watch live_polls{path='cold-resolve'} rise with them), never failed polls.", }); + const staleHydrates = meter.createCounter("realtime_native.stale_hydrates", { + description: + "Wake hydrates the read-your-writes tripwire caught reading behind the publish. 'recovered' = a retry delivered the fresh row; sustained 'gave_up' means replica lag is outrunning the retry budget.", + }); + const limiter = new RealtimeConcurrencyLimiter({ keyPrefix: "tr:realtime:native:concurrency", redis: { @@ -120,8 +126,37 @@ function initializeNativeRealtimeClient(): NativeRealtimeClient { maxCacheEntries: env.REALTIME_BACKEND_NATIVE_RUN_CACHE_MAX_ENTRIES, }); + // Read-your-writes gate: the estimator samples replica lag (reader-side only, paused + // when idle) and the router delays wake hydrates by it, anchored to each record's + // updatedAtMs — so a publish racing the replica's apply is waited out, not read stale. + const lagEstimator = + env.REALTIME_BACKEND_NATIVE_REPLICA_LAG_GATE_ENABLED === "1" + ? new ReplicaLagEstimator({ + source: createPostgresReplicaLagSource($replica), + sampleIntervalMs: env.REALTIME_BACKEND_NATIVE_REPLICA_LAG_SAMPLE_INTERVAL_MS, + idleAfterMs: env.REALTIME_BACKEND_NATIVE_REPLICA_LAG_IDLE_AFTER_MS, + windowMs: env.REALTIME_BACKEND_NATIVE_REPLICA_LAG_WINDOW_MS, + defaultLagMs: env.REALTIME_BACKEND_NATIVE_REPLICA_LAG_DEFAULT_MS, + observedFloorTtlMs: env.REALTIME_BACKEND_NATIVE_REPLICA_LAG_OBSERVED_FLOOR_TTL_MS, + }) + : undefined; + + // The notifier wrapped so router activity keeps the lag sampler warm. + const notifier = getRunChangeNotifier(); + const source: EnvChangeSource = lagEstimator + ? { + subscribeToEnv(environmentId, onBatch) { + lagEstimator.touch(); + return notifier.subscribeToEnv(environmentId, (records) => { + lagEstimator.touch(); + onBatch(records); + }); + }, + } + : notifier; + const router = new EnvChangeRouter({ - source: getRunChangeNotifier(), + source, hydrator: runReader, onHydrate: (runCount) => routerHydrates.add(runCount), replayWindowMs: env.REALTIME_BACKEND_NATIVE_REPLAY_WINDOW_MS, @@ -129,6 +164,16 @@ function initializeNativeRealtimeClient(): NativeRealtimeClient { unsubscribeLingerMs: env.REALTIME_BACKEND_NATIVE_UNSUBSCRIBE_LINGER_MS, onReplay: (result) => replays.add(1, { result }), onReplayEviction: (reason) => replayEvictions.add(1, { reason }), + replicaLag: lagEstimator + ? { + getLagMs: () => lagEstimator.getLagMs(), + noteObservedLagMs: (lagMs) => lagEstimator.noteObservedLagMs(lagMs), + marginMs: env.REALTIME_BACKEND_NATIVE_REPLICA_LAG_MARGIN_MS, + maxDelayMs: env.REALTIME_BACKEND_NATIVE_REPLICA_LAG_MAX_DELAY_MS, + staleRetries: env.REALTIME_BACKEND_NATIVE_STALE_HYDRATE_RETRIES, + onStaleHydrate: (outcome, runCount) => staleHydrates.add(runCount, { outcome }), + } + : undefined, }); const client = new NativeRealtimeClient({ @@ -208,6 +253,15 @@ function initializeNativeRealtimeClient(): NativeRealtimeClient { }) .addCallback((result) => result.observe(router.activeEnvCount)); + if (lagEstimator) { + meter + .createObservableGauge("realtime_native.replica_lag_estimate_ms", { + description: + "The read-your-writes gate's current replica-lag estimate (max sample in the window). Wake hydrates are delayed by roughly this much past each change's commit.", + }) + .addCallback((result) => result.observe(lagEstimator.getLagMs())); + } + return client; } diff --git a/apps/webapp/app/services/realtime/replicaLagEstimator.server.ts b/apps/webapp/app/services/realtime/replicaLagEstimator.server.ts new file mode 100644 index 00000000000..17aa7e2b893 --- /dev/null +++ b/apps/webapp/app/services/realtime/replicaLagEstimator.server.ts @@ -0,0 +1,257 @@ +import { logger } from "~/services/logger.server"; + +/** + * ReplicaLagEstimator — tracks how far the read replica trails the primary so the + * EnvChangeRouter can delay wake-path hydrates just long enough to read their own writes. + * Two inputs: a ReplicaLagSource (active, reader-side only — never queries the primary) + * sampled on an interval while the router is busy, and passive observations fed back by + * the router's stale-hydrate tripwire. The estimate is the max over a short window — + * floored by recent observations — so spikes widen the delay immediately and decay back + * out as fresh samples land. + */ + +type RawQueryable = { + $queryRawUnsafe(query: string): Promise; +}; + +/** A dialect-specific reader-side lag measure. `sampleLagMs()` returns the current lag in + * ms, or undefined when lag is genuinely unmeasurable right now (NOT an error — errors + * throw, and the composing source uses them to rule the dialect out). */ +export interface ReplicaLagSource { + readonly name: string; + sampleLagMs(): Promise; +} + +/** Aurora: replicas share the storage layer and reject every standard WAL function; + * `aurora_replica_status()` is the only live lag source. Max across readers, since the + * `$replica` pool balances over all of them. No reader rows = `$replica` is the writer = + * no lag. Throws on non-Aurora (the function doesn't exist). */ +export class AuroraReplicaLagSource implements ReplicaLagSource { + readonly name = "aurora"; + + constructor(private readonly db: RawQueryable) {} + + async sampleLagMs(): Promise { + const rows = await this.db.$queryRawUnsafe<{ lag: number | null }[]>( + `SELECT max(replica_lag_in_msec)::float8 AS lag FROM aurora_replica_status() WHERE session_id <> 'MASTER_SESSION_ID' AND replica_lag_in_msec IS NOT NULL` + ); + const lag = rows[0]?.lag; + return typeof lag === "number" && Number.isFinite(lag) ? Math.max(0, lag) : 0; + } +} + +/** Vanilla PG streaming replication. A primary (not in recovery — no replica configured, + * `$replica` is the writer) has no lag by definition; a caught-up replica (receive LSN == + * replay LSN) reports 0. Mid-apply there is NO honest reader-side timestamp measure — + * `now() - pg_last_xact_replay_timestamp()` reads as the full inter-write gap on + * low-traffic systems, which (measured locally) pins the estimate at the delay cap — so + * mid-apply reports undefined and the tripwire's observed-staleness floor carries the + * estimate instead. */ +export class VanillaPgReplicaLagSource implements ReplicaLagSource { + readonly name = "vanilla-pg"; + + constructor(private readonly db: RawQueryable) {} + + async sampleLagMs(): Promise { + const rows = await this.db.$queryRawUnsafe<{ caught_up: boolean | null }[]>( + `SELECT CASE + WHEN NOT pg_is_in_recovery() THEN true + WHEN pg_last_wal_receive_lsn() IS NOT DISTINCT FROM pg_last_wal_replay_lsn() THEN true + ELSE false + END AS caught_up` + ); + return rows[0]?.caught_up ? 0 : undefined; + } +} + +/** Composes dialect sources: the first whose sample succeeds is selected and used from + * then on; a database where none work degrades to never-measuring (the estimator then + * runs on its default + tripwire observations). Selection is by thrown-vs-returned — + * sources throw on unsupported dialects and return undefined for "can't measure now". */ +export class FirstSupportedReplicaLagSource implements ReplicaLagSource { + /** undefined = not probed yet; null = no candidate works here. */ + #selected: ReplicaLagSource | null | undefined; + + constructor(private readonly candidates: ReplicaLagSource[]) {} + + get name(): string { + return this.#selected ? this.#selected.name : "undetected"; + } + + async sampleLagMs(): Promise { + if (this.#selected === null) { + return undefined; + } + if (this.#selected) { + // Transient errors don't unselect the dialect; the sample is just skipped. + try { + return await this.#selected.sampleLagMs(); + } catch { + return undefined; + } + } + for (const candidate of this.candidates) { + try { + const lag = await candidate.sampleLagMs(); + this.#selected = candidate; + logger.info("[replicaLagEstimator] selected lag source", { source: candidate.name }); + return lag; + } catch { + // unsupported dialect; try the next + } + } + this.#selected = null; + logger.warn( + "[replicaLagEstimator] no usable lag source; relying on default + tripwire observations" + ); + return undefined; + } +} + +/** The standard composition for a Prisma replica client. */ +export function createPostgresReplicaLagSource(replica: RawQueryable): ReplicaLagSource { + return new FirstSupportedReplicaLagSource([ + new AuroraReplicaLagSource(replica), + new VanillaPgReplicaLagSource(replica), + ]); +} + +export type ReplicaLagEstimatorOptions = { + source: ReplicaLagSource; + /** Sample cadence while active. */ + sampleIntervalMs?: number; + /** Stop sampling this long after the last touch(); the next touch resumes. */ + idleAfterMs?: number; + /** The estimate is the max sample inside this window. */ + windowMs?: number; + /** Estimate before any sample lands (and the floor when sampling is unavailable). */ + defaultLagMs?: number; + /** Ceiling on accepted samples — shields the estimate from a wild observation. */ + maxLagMs?: number; + /** How long a tripwire observation floors the estimate. Sources that can't measure + * mid-apply lag (vanilla PG) return nothing, so without this floor the estimate decays + * to the caught-up zeros within windowMs and every ~window one wake pays a stale retry + * to re-learn. */ + observedFloorTtlMs?: number; + /** Observability: a sample (active or passive) was accepted. */ + onSample?: (lagMs: number, source: "probe" | "observed") => void; +}; + +const DEFAULT_SAMPLE_INTERVAL_MS = 250; +const DEFAULT_IDLE_AFTER_MS = 30_000; +const DEFAULT_WINDOW_MS = 5_000; +const DEFAULT_DEFAULT_LAG_MS = 30; +const DEFAULT_MAX_LAG_MS = 60_000; +const DEFAULT_OBSERVED_FLOOR_TTL_MS = 60_000; + +export class ReplicaLagEstimator { + readonly #sampleIntervalMs: number; + readonly #idleAfterMs: number; + readonly #windowMs: number; + readonly #defaultLagMs: number; + readonly #maxLagMs: number; + readonly #observedFloorTtlMs: number; + #samples: { atMs: number; lagMs: number }[] = []; + #lastKnownLagMs: number | undefined; + #observedFloorLagMs = 0; + #observedFloorAtMs = 0; + #lastTouchMs = 0; + #timer: ReturnType | undefined; + #sampling = false; + + constructor(private readonly options: ReplicaLagEstimatorOptions) { + this.#sampleIntervalMs = options.sampleIntervalMs ?? DEFAULT_SAMPLE_INTERVAL_MS; + this.#idleAfterMs = options.idleAfterMs ?? DEFAULT_IDLE_AFTER_MS; + this.#windowMs = options.windowMs ?? DEFAULT_WINDOW_MS; + this.#defaultLagMs = options.defaultLagMs ?? DEFAULT_DEFAULT_LAG_MS; + this.#maxLagMs = options.maxLagMs ?? DEFAULT_MAX_LAG_MS; + this.#observedFloorTtlMs = options.observedFloorTtlMs ?? DEFAULT_OBSERVED_FLOOR_TTL_MS; + } + + /** Mark router activity; starts (or keeps) the sampler running. */ + touch() { + this.#lastTouchMs = Date.now(); + if (!this.#timer) { + this.#timer = setInterval(() => this.#tick(), this.#sampleIntervalMs); + this.#timer.unref?.(); + // Sample immediately so the first wake after idle doesn't run on a stale estimate. + this.#tick(); + } + } + + /** Current lag estimate (ms): the max recent sample (else last known, else the default), + * floored by the latest tripwire observation while it's fresh. Never throws. */ + getLagMs(): number { + const now = Date.now(); + const cutoff = now - this.#windowMs; + let max: number | undefined; + for (const sample of this.#samples) { + if (sample.atMs >= cutoff && (max === undefined || sample.lagMs > max)) { + max = sample.lagMs; + } + } + const base = max ?? this.#lastKnownLagMs ?? this.#defaultLagMs; + const floor = + now - this.#observedFloorAtMs < this.#observedFloorTtlMs ? this.#observedFloorLagMs : 0; + return Math.max(base, floor); + } + + /** Feedback from the stale-hydrate tripwire: a read provably ran at least this far + * behind the primary. Widens the estimate immediately AND floors it for a while — + * sources that can't measure mid-apply lag would otherwise decay it straight back. */ + noteObservedLagMs(lagMs: number) { + const clamped = Math.min(Math.max(0, lagMs), this.#maxLagMs); + const floorExpired = Date.now() - this.#observedFloorAtMs >= this.#observedFloorTtlMs; + if (clamped >= this.#observedFloorLagMs || floorExpired) { + this.#observedFloorLagMs = clamped; + this.#observedFloorAtMs = Date.now(); + } + this.#accept(lagMs, "observed"); + } + + stop() { + if (this.#timer) { + clearInterval(this.#timer); + this.#timer = undefined; + } + } + + #accept(lagMs: number, source: "probe" | "observed") { + if (!Number.isFinite(lagMs)) { + return; + } + const clamped = Math.min(Math.max(0, lagMs), this.#maxLagMs); + const now = Date.now(); + this.#samples.push({ atMs: now, lagMs: clamped }); + this.#lastKnownLagMs = clamped; + const cutoff = now - this.#windowMs; + while (this.#samples.length > 0 && this.#samples[0].atMs < cutoff) { + this.#samples.shift(); + } + this.options.onSample?.(clamped, source); + } + + #tick() { + if (Date.now() - this.#lastTouchMs > this.#idleAfterMs) { + this.stop(); + return; + } + if (this.#sampling) { + return; // a slow sample shouldn't stack + } + this.#sampling = true; + this.options.source + .sampleLagMs() + .then((lagMs) => { + if (lagMs !== undefined) { + this.#accept(lagMs, "probe"); + } + }) + .catch(() => { + // sampling errors never propagate; the estimate just ages + }) + .finally(() => { + this.#sampling = false; + }); + } +} diff --git a/apps/webapp/app/v3/runEngineHandlers.server.ts b/apps/webapp/app/v3/runEngineHandlers.server.ts index c8d9240154e..082974af388 100644 --- a/apps/webapp/app/v3/runEngineHandlers.server.ts +++ b/apps/webapp/app/v3/runEngineHandlers.server.ts @@ -68,6 +68,7 @@ export function registerRunEngineEventBusHandlers() { envId: environment.id, tags: taskRun.runTags, batchId: taskRun.batchId, + updatedAtMs: run.updatedAt.getTime(), }); const eventRepository = await getEventRepositoryForStore( @@ -149,6 +150,7 @@ export function registerRunEngineEventBusHandlers() { envId: environment.id, tags: taskRun.runTags, batchId: taskRun.batchId, + updatedAtMs: run.updatedAt.getTime(), }); const eventRepository = await getEventRepositoryForStore( @@ -217,6 +219,7 @@ export function registerRunEngineEventBusHandlers() { envId: taskRun.runtimeEnvironmentId, tags: taskRun.runTags, batchId: taskRun.batchId, + updatedAtMs: run.updatedAt.getTime(), }); if (!taskRun.organizationId) { @@ -409,6 +412,7 @@ export function registerRunEngineEventBusHandlers() { envId: environment.id, tags: taskRun.runTags, batchId: taskRun.batchId, + updatedAtMs: run.updatedAt.getTime(), }); const eventRepository = await getEventRepositoryForStore( @@ -474,6 +478,7 @@ export function registerRunEngineEventBusHandlers() { envId: environment.id, tags: taskRun.runTags, batchId: taskRun.batchId, + updatedAtMs: run.updatedAt.getTime(), }); const eventRepository = await getEventRepositoryForStore( @@ -572,10 +577,19 @@ export function registerRunEngineEventBusHandlers() { const { environment, runTags, batchId } = result; try { - await updateMetadataService.call(run.id, run.metadata, environment); + const updateResult = await updateMetadataService.call(run.id, run.metadata, environment); // Realtime run-changed publish, after the write so the router's hydrate sees the new - // row. A full record (env + tags + batchId), so feeds route by index. - publishChangeRecord({ runId: run.id, envId: environment.id, tags: runTags, batchId }); + // row. A full record (env + tags + batchId + the committed updatedAt watermark), so + // feeds route by index. Nothing written here (no-op or buffered) = nothing to announce. + if (updateResult?.updatedAtMs !== undefined) { + publishChangeRecord({ + runId: run.id, + envId: environment.id, + tags: runTags, + batchId, + updatedAtMs: updateResult.updatedAtMs, + }); + } } catch (e) { if (e instanceof MetadataTooLargeError) { logger.warn("[runMetadataUpdated] Failed to update metadata, too large", { diff --git a/apps/webapp/test/realtime/envChangeRouter.test.ts b/apps/webapp/test/realtime/envChangeRouter.test.ts index 6f2eb6df980..022b5827cd6 100644 --- a/apps/webapp/test/realtime/envChangeRouter.test.ts +++ b/apps/webapp/test/realtime/envChangeRouter.test.ts @@ -361,3 +361,190 @@ describe("EnvChangeRouter", () => { reg.close(); }); }); + +describe("EnvChangeRouter read-your-writes gate", () => { + function gate(overrides: Record = {}) { + const observed: number[] = []; + const outcomes: { outcome: string; runCount: number }[] = []; + return { + observed, + outcomes, + replicaLag: { + getLagMs: () => 0, + noteObservedLagMs: (ms: number) => observed.push(ms), + marginMs: 0, + maxDelayMs: 1_000, + staleRetries: 3, + onStaleHydrate: (outcome: string, runCount: number) => + outcomes.push({ outcome, runCount }), + ...overrides, + }, + }; + } + + it("delays the wake hydrate by lag+margin anchored to the record's updatedAtMs", async () => { + const rows = new Map([["r1", row("r1", { tags: ["a"] })]]); + const g = gate({ getLagMs: () => 80, marginMs: 10 }); + const { router, src, hydrateSpy } = makeRouter(rows, { replicaLag: g.replicaLag }); + const reg = router.register("env_1", { kind: "tag", tags: ["a"] }, []); + const wait = reg.waitForMatch(undefined, 2_000); + + const pushedAt = Date.now(); + src.push("env_1", [record("r1", { tags: ["a"], updatedAtMs: pushedAt })]); + + // The hydrate must wait out the lag window (~90ms), not run immediately. + await new Promise((r) => setTimeout(r, 30)); + expect(hydrateSpy).not.toHaveBeenCalled(); + + const result = await wait; + expect(result.reason).toBe("notify"); + expect(Date.now() - pushedAt).toBeGreaterThanOrEqual(80); + reg.close(); + }); + + it("does not delay when the record's anchor is already past the lag window", async () => { + // The row's updatedAt matches the watermark so the tripwire stays quiet. + const anchorMs = Date.now() - 5_000; + const rows = new Map([["r1", row("r1", { tags: ["a"], updatedAtMs: anchorMs })]]); + const g = gate({ getLagMs: () => 80, marginMs: 10 }); + const { router, src } = makeRouter(rows, { replicaLag: g.replicaLag }); + const reg = router.register("env_1", { kind: "tag", tags: ["a"] }, []); + const wait = reg.waitForMatch(undefined, 2_000); + + const pushedAt = Date.now(); + src.push("env_1", [record("r1", { tags: ["a"], updatedAtMs: anchorMs })]); + + const result = await wait; + expect(result.reason).toBe("notify"); + expect(Date.now() - pushedAt).toBeLessThan(60); + reg.close(); + }); + + it("withholds a stale row, re-hydrates, and delivers only the fresh version", async () => { + const watermark = FLOOR_MS + 10_000; + const staleRow = row("r1", { tags: ["a"], updatedAtMs: FLOOR_MS + 5_000 }); + const freshRow = row("r1", { tags: ["a"], updatedAtMs: watermark }); + const hydrateSpy = vi + .fn() + .mockResolvedValueOnce([staleRow]) + .mockResolvedValue([freshRow]); + const src = fakeSource(); + const g = gate(); + const router = new EnvChangeRouter({ + source: src.source, + hydrator: { hydrateByIds: hydrateSpy }, + replicaLag: g.replicaLag, + }); + const reg = router.register("env_1", { kind: "tag", tags: ["a"] }, []); + const wait = reg.waitForMatch(undefined, 2_000); + + src.push("env_1", [record("r1", { tags: ["a"], updatedAtMs: watermark })]); + + const result = await wait; + expect(result.reason).toBe("notify"); + expect(result.rows[0].row.updatedAt.getTime()).toBe(watermark); + expect(hydrateSpy).toHaveBeenCalledTimes(2); + expect(g.observed.length).toBe(1); // the stale read fed the estimator + expect(g.outcomes).toEqual([{ outcome: "recovered", runCount: 1 }]); + reg.close(); + }); + + it("a missing row with a watermark (insert race) retries and delivers when it appears", async () => { + const watermark = FLOOR_MS + 10_000; + const freshRow = row("r1", { tags: ["a"], updatedAtMs: watermark }); + const hydrateSpy = vi + .fn() + .mockResolvedValueOnce([]) + .mockResolvedValue([freshRow]); + const src = fakeSource(); + const g = gate(); + const router = new EnvChangeRouter({ + source: src.source, + hydrator: { hydrateByIds: hydrateSpy }, + replicaLag: g.replicaLag, + }); + const reg = router.register("env_1", { kind: "tag", tags: ["a"] }, []); + const wait = reg.waitForMatch(undefined, 2_000); + + src.push("env_1", [record("r1", { tags: ["a"], updatedAtMs: watermark })]); + + const result = await wait; + expect(result.rows[0].row.id).toBe("r1"); + expect(hydrateSpy).toHaveBeenCalledTimes(2); + reg.close(); + }); + + it("delivers the stale row after exhausting retries (liveness over freshness)", async () => { + const watermark = FLOOR_MS + 10_000; + const staleRow = row("r1", { tags: ["a"], updatedAtMs: FLOOR_MS + 5_000 }); + const hydrateSpy = vi.fn().mockResolvedValue([staleRow]); + const src = fakeSource(); + const g = gate({ staleRetries: 1 }); + const router = new EnvChangeRouter({ + source: src.source, + hydrator: { hydrateByIds: hydrateSpy }, + replicaLag: g.replicaLag, + }); + const reg = router.register("env_1", { kind: "tag", tags: ["a"] }, []); + const wait = reg.waitForMatch(undefined, 2_000); + + src.push("env_1", [record("r1", { tags: ["a"], updatedAtMs: watermark })]); + + const result = await wait; + expect(result.reason).toBe("notify"); + expect(result.rows[0].row.updatedAt.getTime()).toBe(FLOOR_MS + 5_000); + expect(hydrateSpy).toHaveBeenCalledTimes(2); // first pass + 1 retry + expect(g.outcomes).toEqual([{ outcome: "gave_up", runCount: 1 }]); + reg.close(); + }); + + it("after giving up, echo passes deliver the fresh row once the replica catches up", async () => { + // Recent watermark (inside the echo horizon); retries exhausted immediately. + const watermark = Date.now() - 50; + const staleRow = row("r1", { tags: ["a"], updatedAtMs: watermark - 5_000 }); + const freshRow = row("r1", { tags: ["a"], updatedAtMs: watermark }); + const hydrateSpy = vi + .fn() + .mockResolvedValueOnce([staleRow]) // first pass: stale -> gave_up, delivered anyway + .mockResolvedValue([freshRow]); // echo pass: fresh + const src = fakeSource(); + const g = gate({ staleRetries: 0 }); + const router = new EnvChangeRouter({ + source: src.source, + hydrator: { hydrateByIds: hydrateSpy }, + replicaLag: g.replicaLag, + }); + const reg = router.register("env_1", { kind: "tag", tags: ["a"] }, []); + + const first = reg.waitForMatch(undefined, 2_000); + src.push("env_1", [record("r1", { tags: ["a"], updatedAtMs: watermark })]); + const staleDelivery = await first; + expect(staleDelivery.rows[0].row.updatedAt.getTime()).toBe(watermark - 5_000); + expect(g.outcomes).toEqual([{ outcome: "gave_up", runCount: 1 }]); + + // The client re-arms (as it would after consuming the stale emission); the echo + // re-hydrate delivers the fresh version through the normal pipeline. + const echoed = await reg.waitForMatch(undefined, 2_000); + expect(echoed.reason).toBe("notify"); + expect(echoed.rows[0].row.updatedAt.getTime()).toBe(watermark); + reg.close(); + }); + + it("records without a watermark bypass the tripwire entirely", async () => { + const staleLooking = row("r1", { tags: ["a"], updatedAtMs: FLOOR_MS + 5_000 }); + const rows = new Map([["r1", staleLooking]]); + const g = gate(); + const { router, src, hydrateSpy } = makeRouter(rows, { replicaLag: g.replicaLag }); + const reg = router.register("env_1", { kind: "tag", tags: ["a"] }, []); + const wait = reg.waitForMatch(undefined, 2_000); + + src.push("env_1", [record("r1", { tags: ["a"] })]); // no updatedAtMs + + const result = await wait; + expect(result.reason).toBe("notify"); + expect(hydrateSpy).toHaveBeenCalledTimes(1); + expect(g.outcomes).toEqual([]); + expect(g.observed).toEqual([]); + reg.close(); + }); +}); diff --git a/apps/webapp/test/realtime/replicaLagEstimator.test.ts b/apps/webapp/test/realtime/replicaLagEstimator.test.ts new file mode 100644 index 00000000000..7666ea28efb --- /dev/null +++ b/apps/webapp/test/realtime/replicaLagEstimator.test.ts @@ -0,0 +1,164 @@ +import { afterEach, describe, expect, it } from "vitest"; +import { + FirstSupportedReplicaLagSource, + ReplicaLagEstimator, + type ReplicaLagSource, +} from "~/services/realtime/replicaLagEstimator.server"; + +const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms)); + +function source(sampleLagMs: () => Promise, name = "fake"): ReplicaLagSource { + return { name, sampleLagMs }; +} + +describe("ReplicaLagEstimator", () => { + let estimator: ReplicaLagEstimator | undefined; + + afterEach(() => { + estimator?.stop(); + estimator = undefined; + }); + + it("returns the default before any sample lands", () => { + estimator = new ReplicaLagEstimator({ + source: source(async () => undefined), + defaultLagMs: 42, + }); + expect(estimator.getLagMs()).toBe(42); + }); + + it("samples the source while touched and reports the window max", async () => { + const samples = [10, 60, 20]; + let i = 0; + estimator = new ReplicaLagEstimator({ + source: source(async () => samples[Math.min(i++, samples.length - 1)]), + sampleIntervalMs: 10, + windowMs: 5_000, + defaultLagMs: 0, + }); + estimator.touch(); + await sleep(60); + // The max sample (60) dominates even after smaller ones land. + expect(estimator.getLagMs()).toBe(60); + }); + + it("widens immediately on an observed (tripwire) lag and clamps wild values", () => { + estimator = new ReplicaLagEstimator({ + source: source(async () => 5), + defaultLagMs: 5, + maxLagMs: 1_000, + }); + estimator.noteObservedLagMs(250); + expect(estimator.getLagMs()).toBe(250); + estimator.noteObservedLagMs(99_999); + expect(estimator.getLagMs()).toBe(1_000); + }); + + it("an observed lag floors the estimate past the sample window (until its TTL)", async () => { + estimator = new ReplicaLagEstimator({ + source: source(async () => 0), // caught-up zeros, like vanilla PG between writes + sampleIntervalMs: 10, + windowMs: 30, + defaultLagMs: 0, + observedFloorTtlMs: 10_000, + }); + estimator.touch(); + estimator.noteObservedLagMs(150); + // Long past windowMs the zeros have flushed the observation out of the window, + // but the floor still carries it. + await sleep(80); + expect(estimator.getLagMs()).toBe(150); + }); + + it("stops sampling once idle and resumes on touch", async () => { + let probes = 0; + estimator = new ReplicaLagEstimator({ + source: source(async () => { + probes++; + return 1; + }), + sampleIntervalMs: 10, + idleAfterMs: 20, + }); + estimator.touch(); + await sleep(80); + const afterIdle = probes; + expect(afterIdle).toBeGreaterThan(0); + await sleep(40); + // No new samples while idle... + expect(probes).toBe(afterIdle); + // ...and touching resumes immediately. + estimator.touch(); + await sleep(15); + expect(probes).toBeGreaterThan(afterIdle); + }); + + it("survives a throwing source and keeps the last known value", async () => { + let fail = false; + estimator = new ReplicaLagEstimator({ + source: source(async () => { + if (fail) throw new Error("source down"); + return 33; + }), + sampleIntervalMs: 10, + windowMs: 30, + defaultLagMs: 0, + }); + estimator.touch(); + await sleep(25); + expect(estimator.getLagMs()).toBe(33); + fail = true; + await sleep(60); + // Window emptied, source failing — falls back to the last known sample. + expect(estimator.getLagMs()).toBe(33); + }); +}); + +describe("FirstSupportedReplicaLagSource", () => { + it("selects the first candidate whose sample succeeds and sticks with it", async () => { + let auroraCalls = 0; + let vanillaCalls = 0; + const composed = new FirstSupportedReplicaLagSource([ + source(async () => { + auroraCalls++; + throw new Error("function aurora_replica_status() does not exist"); + }, "aurora"), + source(async () => { + vanillaCalls++; + return 7; + }, "vanilla-pg"), + ]); + expect(composed.name).toBe("undetected"); + expect(await composed.sampleLagMs()).toBe(7); + expect(composed.name).toBe("vanilla-pg"); + expect(await composed.sampleLagMs()).toBe(7); + // The unsupported dialect was only probed during selection. + expect(auroraCalls).toBe(1); + expect(vanillaCalls).toBe(2); + }); + + it("degrades to never-measuring when no candidate works", async () => { + const composed = new FirstSupportedReplicaLagSource([ + source(async () => { + throw new Error("nope"); + }), + ]); + expect(await composed.sampleLagMs()).toBeUndefined(); + expect(await composed.sampleLagMs()).toBeUndefined(); + }); + + it("a transient error after selection skips the sample without unselecting", async () => { + let calls = 0; + const composed = new FirstSupportedReplicaLagSource([ + source(async () => { + calls++; + if (calls === 2) throw new Error("transient"); + return 11; + }, "flaky"), + ]); + expect(await composed.sampleLagMs()).toBe(11); + expect(await composed.sampleLagMs()).toBeUndefined(); // transient error -> skipped sample + expect(await composed.sampleLagMs()).toBe(11); // still selected + expect(composed.name).toBe("flaky"); + }); +}); From 0e8088d9354cb92c095b671badbfbd7732d623e0 Mon Sep 17 00:00:00 2001 From: Eric Allam Date: Thu, 11 Jun 2026 18:20:01 +0100 Subject: [PATCH 2/2] chore(docker): opt-in lagged postgres replica for local dev A streaming standby behind the "replica" compose profile with a configurable recovery_min_apply_delay, so replica-lag behavior can be reproduced deterministically on a laptop. Also raises the primary's max_connections so multi-instance local testing has connection headroom. --- docker/docker-compose.yml | 50 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 4cebfb35cd6..1e70d94cd0d 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -11,6 +11,7 @@ name: triggerdotdev-docker volumes: database-data: database-data-alt: + database-replica-data: redis-data: minio-data: clickhouse-data: @@ -44,6 +45,55 @@ services: - wal_level=logical - -c - shared_preload_libraries=pg_partman_bgw + # The webapp opens ~50 pooled connections per instance and Electric another + # ~40, so the default 100 is exhausted by one webapp + Electric alone. Raise + # it so multiple instances / load tests have headroom. + - -c + - max_connections=500 + + # Opt-in streaming read replica with configurable apply lag — a dial-a-lag rig for + # testing replica-race behavior (e.g. the realtime read-your-writes gate) locally. + # Start with: COMPOSE_PROFILES=replica pnpm run docker + # One-time primary prep (allows replication connections; additive, survives restarts): + # docker exec database bash -c 'grep -q "host replication" "$PGDATA/pg_hba.conf" || echo "host replication all all md5" >> "$PGDATA/pg_hba.conf"' + # docker exec database psql -U postgres -c "SELECT pg_reload_conf()" + # Then point the webapp at it: DATABASE_READ_REPLICA_URL=postgresql://postgres:postgres@localhost:5433/postgres + # Tune the lag via REPLICA_APPLY_DELAY (e.g. 150ms, 2s). Wipe database-replica-data to re-init. + database-replica: + container_name: ${CONTAINER_PREFIX:-}database-replica + profiles: ["replica"] + build: + context: . + dockerfile: Dockerfile.postgres + restart: always + depends_on: + - database + volumes: + - ${DB_REPLICA_VOLUME:-database-replica-data}:/var/lib/postgresql/data/ + environment: + PGPASSWORD: postgres + REPLICA_APPLY_DELAY: ${REPLICA_APPLY_DELAY:-150ms} + networks: + - app_network + ports: + - "${POSTGRES_REPLICA_HOST_PORT:-5433}:5432" + entrypoint: ["bash", "-c"] + command: + - | + set -e + if [ ! -s "$$PGDATA/PG_VERSION" ]; then + echo "initializing streaming replica from 'database'..." + mkdir -p "$$PGDATA" + chown postgres:postgres "$$PGDATA" + chmod 0700 "$$PGDATA" + until gosu postgres pg_basebackup -h database -U postgres -D "$$PGDATA" -Fp -Xs -R; do + echo "primary not ready for replication (did you run the one-time pg_hba prep above?); retrying..." + rm -rf "$$PGDATA"/* 2>/dev/null || true + sleep 2 + done + fi + # max_connections must be >= the primary's (hot-standby requirement). + exec docker-entrypoint.sh postgres -c hot_standby=on -c max_connections=500 -c "recovery_min_apply_delay=$$REPLICA_APPLY_DELAY" redis: container_name: ${CONTAINER_PREFIX:-}redis