diff --git a/.server-changes/realtime-replica-read-consistency.md b/.server-changes/realtime-replica-read-consistency.md new file mode 100644 index 0000000000..d23c73e682 --- /dev/null +++ b/.server-changes/realtime-replica-read-consistency.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: fix +--- + +Realtime feed reads now wait out measured read-replica lag and retry stale reads, so subscribers receive each change's current content instead of trailing one change behind when a read replica races the write. diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index 7963492683..d9c9771194 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -338,6 +338,28 @@ const EnvironmentSchema = z // TTL/size of the per-org realtimeBackend flag cache used to pick the serving backend. REALTIME_BACKEND_FLAG_CACHE_TTL_MS: z.coerce.number().int().default(30_000), REALTIME_BACKEND_FLAG_CACHE_MAX_ENTRIES: z.coerce.number().int().default(50_000), + // "1" enables the read-your-writes gate: wake hydrates wait out the measured replica lag + // (anchored to the change record's updatedAtMs) and stale reads are retried. + REALTIME_BACKEND_NATIVE_REPLICA_LAG_GATE_ENABLED: z.string().default("1"), + // Reader-side lag probe cadence while the router is active; probing pauses when idle. + REALTIME_BACKEND_NATIVE_REPLICA_LAG_SAMPLE_INTERVAL_MS: z.coerce.number().int().default(250), + REALTIME_BACKEND_NATIVE_REPLICA_LAG_IDLE_AFTER_MS: z.coerce.number().int().default(30_000), + // The lag estimate is the max sample inside this window (spikes widen it immediately). + REALTIME_BACKEND_NATIVE_REPLICA_LAG_WINDOW_MS: z.coerce.number().int().default(5_000), + // Estimate before the first sample lands (and the floor when probing is unavailable). + REALTIME_BACKEND_NATIVE_REPLICA_LAG_DEFAULT_MS: z.coerce.number().int().default(30), + // Safety margin (clock skew + scheduling) added on top of the lag estimate. + REALTIME_BACKEND_NATIVE_REPLICA_LAG_MARGIN_MS: z.coerce.number().int().default(10), + // Hard cap on any single gate delay — a sick replica degrades freshness, never liveness. + REALTIME_BACKEND_NATIVE_REPLICA_LAG_MAX_DELAY_MS: z.coerce.number().int().default(1_000), + // Re-hydrate attempts for rows the tripwire still finds stale after the delay. + REALTIME_BACKEND_NATIVE_STALE_HYDRATE_RETRIES: z.coerce.number().int().default(3), + // How long a tripwire-observed staleness floors the lag estimate (vanilla-PG replicas + // can't measure mid-apply lag, so observations carry the estimate between races). + REALTIME_BACKEND_NATIVE_REPLICA_LAG_OBSERVED_FLOOR_TTL_MS: z.coerce + .number() + .int() + .default(60_000), PUBSUB_REDIS_HOST: z .string() diff --git a/apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts b/apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts index d3d92d1fe5..3f22929aca 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts @@ -100,7 +100,20 @@ export async function routeOperationsToRun( const [error, result] = await tryCatch( updateMetadataService.call(targetRunId, { operations }, env) ); - if (!error && result !== undefined) return; + if (!error && result !== undefined) { + // The parent/root run changed too — wake its live feeds (only when something was + // actually written here; buffered writes publish from the flusher). + if (result.updatedAtMs !== undefined) { + publishChangeRecord({ + runId: result.runId, + envId: env.id, + tags: result.runTags, + batchId: result.batchId, + updatedAtMs: result.updatedAtMs, + }); + } + return; + } if (error) { // PG threw — auxiliary op, stay best-effort and don't surface this @@ -186,13 +199,18 @@ const { action } = createActionApiRoute( } if (pgResult) { // Reflect metadata.set() on a live feed before the next lifecycle event. Publish the - // internal id (the router keys single-run feeds by it, not the friendly id from the URL). - publishChangeRecord({ - runId: pgResult.runId, - envId: env.id, - tags: pgResult.runTags, - batchId: pgResult.batchId, - }); + // internal id (the router keys single-run feeds by it, not the friendly id from the + // URL) with the committed updatedAt as the read-your-writes watermark. No write + // (no-op body, or ops buffered for the flusher) means nothing to announce here. + if (pgResult.updatedAtMs !== undefined) { + publishChangeRecord({ + runId: pgResult.runId, + envId: env.id, + tags: pgResult.runTags, + batchId: pgResult.batchId, + updatedAtMs: pgResult.updatedAtMs, + }); + } return json({ metadata: pgResult.metadata }, { status: 200 }); } diff --git a/apps/webapp/app/routes/api.v1.runs.$runId.tags.ts b/apps/webapp/app/routes/api.v1.runs.$runId.tags.ts index e98d3f3582..f984562eb3 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runId.tags.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runId.tags.ts @@ -84,20 +84,22 @@ export async function action({ request, params }: ActionFunctionArgs) { if (newTags.length === 0) { return json({ message: "No new tags to add" }, { status: 200 }); } - await prisma.taskRun.update({ + const updated = await prisma.taskRun.update({ where: { id: taskRun.id, runtimeEnvironmentId: env.id, }, data: { runTags: { push: newTags } }, + select: { updatedAt: true }, }); // Publish a run-changed record with the NEW tag set so tag feeds reindex - // (no-op unless enabled). + // (no-op unless enabled). updatedAt is the read-your-writes watermark. publishChangeRecord({ runId: taskRun.id, envId: env.id, tags: existing.concat(newTags), batchId: taskRun.batchId, + updatedAtMs: updated.updatedAt.getTime(), }); return json({ message: `Successfully set ${newTags.length} new tags.` }, { status: 200 }); }, diff --git a/apps/webapp/app/services/metadata/updateMetadata.server.ts b/apps/webapp/app/services/metadata/updateMetadata.server.ts index 3948da046f..7b87034a30 100644 --- a/apps/webapp/app/services/metadata/updateMetadata.server.ts +++ b/apps/webapp/app/services/metadata/updateMetadata.server.ts @@ -30,6 +30,16 @@ export type UpdateMetadataServiceOptions = { maximumSize?: number; logger?: Logger; logLevel?: LogLevel; + /** Called after the batched flusher writes a run's buffered operations, with everything + * a realtime change record needs — buffered (parent/root) updates otherwise never wake + * live feeds. */ + onRunFlushed?: (run: { + runId: string; + environmentId: string; + tags: string[]; + batchId: string | null; + updatedAtMs: number; + }) => void; // Testing hooks onBeforeUpdate?: (runId: string) => Promise; onAfterRead?: (runId: string, metadataVersion: number) => Promise; @@ -172,12 +182,20 @@ export class UpdateMetadataService { operations: BufferedRunMetadataChangeOperation[] ) => { return Effect.gen(this, function* (_) { - // Fetch current run + // Fetch current run (+ the realtime membership keys, so a flush can publish) const run = yield* _( Effect.tryPromise(() => this._prisma.taskRun.findFirst({ where: { id: runId }, - select: { id: true, metadata: true, metadataType: true, metadataVersion: true }, + select: { + id: true, + metadata: true, + metadataType: true, + metadataVersion: true, + runtimeEnvironmentId: true, + runTags: true, + batchId: true, + }, }) ) ); @@ -237,6 +255,9 @@ export class UpdateMetadataService { yield* _(Effect.tryPromise(() => this.options.onBeforeUpdate!(runId))); } + // Stamp updatedAt explicitly so the realtime publish can carry the exact committed + // value without a follow-up read (updateMany can't RETURNING). + const writeTime = new Date(); const result = yield* _( Effect.tryPromise(() => this._prisma.taskRun.updateMany({ @@ -247,6 +268,7 @@ export class UpdateMetadataService { data: { metadata: newMetadataPacket.data, metadataVersion: { increment: 1 }, + updatedAt: writeTime, }, }) ) @@ -262,6 +284,16 @@ export class UpdateMetadataService { return yield* _(Effect.fail(new Error("Optimistic lock failed"))); } + yield* Effect.sync(() => { + this.options.onRunFlushed?.({ + runId, + environmentId: run.runtimeEnvironmentId, + tags: run.runTags, + batchId: run.batchId, + updatedAtMs: writeTime.getTime(), + }); + }); + return result; }); }; @@ -346,7 +378,7 @@ export class UpdateMetadataService { this.#ingestRunOperations(taskRun.rootTaskRun?.id ?? taskRun.id, body.rootOperations); } - const newMetadata = await this.#updateRunMetadata({ + const result = await this.#updateRunMetadata({ runId: taskRun.id, body, existingMetadata: { @@ -356,11 +388,14 @@ export class UpdateMetadataService { }); return { - metadata: newMetadata, + metadata: result?.metadata, // Internal id + membership keys, so callers can publish full realtime records the router routes by index. runId: taskRun.id, batchId: taskRun.batchId, runTags: taskRun.runTags, + // The committed row's updatedAt — the realtime watermark. Undefined when nothing was + // written here (no-op, or buffered for the flusher, which publishes itself). + updatedAtMs: result?.updatedAtMs, }; } @@ -372,7 +407,7 @@ export class UpdateMetadataService { runId: string; body: UpdateMetadataRequestBody; existingMetadata: IOPacket; - }) { + }): Promise<{ metadata: Record | undefined; updatedAtMs?: number }> { if (Array.isArray(body.operations)) { return this.#updateRunMetadataWithOperations(runId, body.operations); } else { @@ -380,7 +415,10 @@ export class UpdateMetadataService { } } - async #updateRunMetadataWithOperations(runId: string, operations: RunMetadataChangeOperation[]) { + async #updateRunMetadataWithOperations( + runId: string, + operations: RunMetadataChangeOperation[] + ): Promise<{ metadata: Record | undefined; updatedAtMs?: number }> { const MAX_RETRIES = 3; let attempts = 0; @@ -408,9 +446,9 @@ export class UpdateMetadataService { // Apply operations to the current metadata const applyResults = applyMetadataOperations(currentMetadata, operations); - // If no operations were applied, return the current metadata + // If no operations were applied, return the current metadata (nothing written) if (applyResults.unappliedOperations.length === operations.length) { - return currentMetadata; + return { metadata: currentMetadata }; } const newMetadataPacket = handleMetadataPacket( @@ -428,7 +466,9 @@ export class UpdateMetadataService { await this.options.onBeforeUpdate(runId); } - // Update with optimistic locking + // Update with optimistic locking; updatedAt stamped explicitly so the caller can + // publish the exact committed watermark without a follow-up read. + const writeTime = new Date(); const result = await this._prisma.taskRun.updateMany({ where: { id: runId, @@ -440,6 +480,7 @@ export class UpdateMetadataService { metadataVersion: { increment: 1, }, + updatedAt: writeTime, }, }); @@ -454,9 +495,10 @@ export class UpdateMetadataService { } // If this was our last attempt, buffer the operations and return optimistically + // (no watermark — the flusher writes later and publishes itself). if (attempts === MAX_RETRIES) { this.#ingestRunOperations(runId, operations); - return applyResults.newMetadata; + return { metadata: applyResults.newMetadata }; } // Otherwise sleep and try again @@ -474,8 +516,10 @@ export class UpdateMetadataService { } // Success! Return the new metadata - return applyResults.newMetadata; + return { metadata: applyResults.newMetadata, updatedAtMs: writeTime.getTime() }; } + + return { metadata: undefined }; } // Checks to see if a run is updatable @@ -493,7 +537,7 @@ export class UpdateMetadataService { runId: string, body: UpdateMetadataRequestBody, existingMetadata: IOPacket - ) { + ): Promise<{ metadata: Record | undefined; updatedAtMs?: number }> { const metadataPacket = handleMetadataPacket( body.metadata, "application/json", @@ -501,9 +545,11 @@ export class UpdateMetadataService { ); if (!metadataPacket) { - return {}; + return { metadata: {} }; } + let updatedAtMs: number | undefined; + if ( metadataPacket.data !== "{}" || (existingMetadata.data && metadataPacket.data !== existingMetadata.data) @@ -515,7 +561,9 @@ export class UpdateMetadataService { }); } - // Update the metadata without version check + // Update the metadata without version check; updatedAt stamped explicitly so the + // caller can publish the exact committed watermark. + const writeTime = new Date(); await this._prisma.taskRun.update({ where: { id: runId, @@ -526,12 +574,14 @@ export class UpdateMetadataService { metadataVersion: { increment: 1, }, + updatedAt: writeTime, }, }); + updatedAtMs = writeTime.getTime(); } const newMetadata = await parsePacket(metadataPacket); - return newMetadata; + return { metadata: newMetadata, updatedAtMs }; } #ingestRunOperations(runId: string, operations: RunMetadataChangeOperation[]) { diff --git a/apps/webapp/app/services/metadata/updateMetadataInstance.server.ts b/apps/webapp/app/services/metadata/updateMetadataInstance.server.ts index fe7c3b63f5..9f1818e5ed 100644 --- a/apps/webapp/app/services/metadata/updateMetadataInstance.server.ts +++ b/apps/webapp/app/services/metadata/updateMetadataInstance.server.ts @@ -2,6 +2,7 @@ import { singleton } from "~/utils/singleton"; import { env } from "~/env.server"; import { UpdateMetadataService } from "./updateMetadata.server"; import { prisma } from "~/db.server"; +import { publishChangeRecord } from "~/services/realtime/runChangeNotifierInstance.server"; export const updateMetadataService = singleton( "update-metadata-service", @@ -13,5 +14,16 @@ export const updateMetadataService = singleton( flushLoggingEnabled: env.BATCH_METADATA_OPERATIONS_FLUSH_LOGGING_ENABLED === "1", maximumSize: env.TASK_RUN_METADATA_MAXIMUM_SIZE, logLevel: env.BATCH_METADATA_OPERATIONS_FLUSH_LOGGING_ENABLED === "1" ? "debug" : "info", + // Buffered (parent/root) operations land via the flusher, not the caller's request — + // publish here so those changes wake live feeds too (no-op when the backend is off). + onRunFlushed: (run) => { + publishChangeRecord({ + runId: run.runId, + envId: run.environmentId, + tags: run.tags, + batchId: run.batchId, + updatedAtMs: run.updatedAtMs, + }); + }, }) ); diff --git a/apps/webapp/app/services/realtime/envChangeRouter.server.ts b/apps/webapp/app/services/realtime/envChangeRouter.server.ts index 587f0e55c5..6c5969e02c 100644 --- a/apps/webapp/app/services/realtime/envChangeRouter.server.ts +++ b/apps/webapp/app/services/realtime/envChangeRouter.server.ts @@ -51,6 +51,25 @@ export type EnvChangeRouterOptions = { /** Observability: a buffered record was evicted. `cap` evictions mean the env churns more * runs inside the window than the buffer holds (the replay guarantee is degrading). */ onReplayEviction?: (reason: "cap" | "window") => void; + /** Read-your-writes gate over the replica: delays wake-path hydrates until the replica + * should have applied the change (record.updatedAtMs + lag + margin), and re-hydrates + * rows the tripwire still finds stale. Omit to hydrate immediately (legacy behavior). */ + replicaLag?: ReplicaLagGate; +}; + +export type ReplicaLagGate = { + /** Current replica-lag estimate (ms). */ + getLagMs(): number; + /** Feedback: a hydrate provably read at least this far behind the primary. */ + noteObservedLagMs(lagMs: number): void; + /** Safety margin added on top of the estimate (clock skew + scheduling). */ + marginMs: number; + /** Hard cap on any single gate delay — a sick replica degrades freshness, never liveness. */ + maxDelayMs: number; + /** Re-hydrate attempts for rows that still read stale after the delay. */ + staleRetries: number; + /** Observability: stale rows recovered by a retry, or delivered stale after exhausting them. */ + onStaleHydrate?: (outcome: "recovered" | "gave_up", runCount: number) => void; }; const DEFAULT_REPLAY_WINDOW_MS = 2_000; @@ -98,6 +117,13 @@ type EnvState = { lingerTimer?: ReturnType; }; +function sleepMs(ms: number): Promise { + return new Promise((resolve) => { + const timer = setTimeout(resolve, ms); + timer.unref?.(); + }); +} + function addToIndex(index: Map>, key: string, feed: Feed) { let set = index.get(key); if (!set) { @@ -322,6 +348,24 @@ export class EnvChangeRouter { } } + /** How long to wait before hydrating so the replica has applied every change in the + * batch: each record is safe at updatedAtMs + lag + margin (records without a watermark + * anchor at now, degrading to a plain lag-sized delay). Capped — see ReplicaLagGate. */ + #gateDelayMs(records: ChangeRecord[]): number { + const gate = this.options.replicaLag; + if (!gate || records.length === 0) { + return 0; + } + const now = Date.now(); + const lagMs = gate.getLagMs(); + let safeAtMs = 0; + for (const record of records) { + const anchorMs = record.updatedAtMs ?? now; + safeAtMs = Math.max(safeAtMs, anchorMs + lagMs + gate.marginMs); + } + return Math.max(0, Math.min(safeAtMs - now, gate.maxDelayMs)); + } + /** Deliver buffered records newer than the feed's cursor through the normal * hydrate -> serialize -> settle pipeline. Already-seen rows diff to nothing downstream. */ async #replayRecent(environmentId: string, env: EnvState, feed: Feed) { @@ -329,15 +373,28 @@ export class EnvChangeRouter { feed.replayCursorMs = Date.now(); const runIds: string[] = []; + const candidateRecords: ChangeRecord[] = []; for (const [runId, entry] of env.recent) { if (entry.receivedAtMs > cursor && this.#recordMatchesFeed(entry.record, feed)) { runIds.push(runId); + candidateRecords.push(entry.record); } } if (runIds.length === 0 || !feed.resolve) { return; } + // Replayed records are usually past the lag window already (delay computes to 0); a + // just-buffered one gets the same read-your-writes gate as the live path. No tripwire + // here — a stale replay diffs to a re-emission on the next wake or backstop. + const replayDelayMs = this.#gateDelayMs(candidateRecords); + if (replayDelayMs > 0) { + await sleepMs(replayDelayMs); + if (!feed.resolve) { + return; + } + } + const hydrated = await this.options.hydrator.hydrateByIds( environmentId, runIds, @@ -399,7 +456,17 @@ export class EnvChangeRouter { } } - async #onBatch(environmentId: string, env: EnvState, records: ChangeRecord[]) { + async #onBatch(environmentId: string, env: EnvState, records: ChangeRecord[], attempt = 0) { + // 0. Read-your-writes gate: wait out the replica's apply lag before hydrating, so the + // rows we read contain the changes the records announce. Retry attempts were + // scheduled with their own delay, so only the first pass gates here. + if (attempt === 0) { + const delayMs = this.#gateDelayMs(records); + if (delayMs > 0) { + await sleepMs(delayMs); + } + } + // 1. Route each record to the held feeds it matches; collect matched runIds per feed. const matchedRunIdsByFeed = new Map>(); const addMatch = (feed: Feed, runId: string) => { @@ -485,6 +552,63 @@ export class EnvChangeRouter { }) ); + // 3.5 Stale tripwire: a watermarked record whose hydrated row is older (or missing — + // the insert race) read a replica that hadn't applied the change. Withhold those + // rows and re-hydrate shortly. Exhausting the retry budget delivers what we have + // (liveness over freshness) — but a stale emission advances the feed's cursor, so + // it ALSO schedules echo passes past the gate: re-hydrates flowing through normal + // emission, where the working-set diff drops unchanged rows and emits the fresh + // version once the replica catches up. The backstop stays the terminal net. + // Each detection feeds the lag estimator. + const gate = this.options.replicaLag; + const isEchoPass = gate !== undefined && attempt > gate.staleRetries; + const staleRunIds = gate + ? this.#detectStaleRuns(records, runIdsByColumnSig, hydratedByColumnSig) + : new Set(); + if (attempt > 0 && !isEchoPass) { + const recovered = new Set(records.map((r) => r.runId)).size - staleRunIds.size; + if (recovered > 0) { + gate?.onStaleHydrate?.("recovered", recovered); + } + } + if (staleRunIds.size > 0 && gate) { + const staleRecords = records.filter((record) => staleRunIds.has(record.runId)); + // Re-buffer the withheld records so a feed that re-arms between now and the next + // pass replays them instead of waiting for its backstop. + this.#bufferRecent(env, staleRecords); + if (attempt >= gate.staleRetries) { + // Budget exhausted: deliver the stale rows below (liveness) — but a stale emission + // advances the feed's cursor, so keep echoing re-hydrates through normal emission + // (the working-set diff drops unchanged rows, emits the fresh version when the + // replica catches up). Echoes stop once the change ages past the horizon; deeper + // outages are the backstop's job. + if (attempt === gate.staleRetries) { + gate.onStaleHydrate?.("gave_up", staleRunIds.size); + } + staleRunIds.clear(); + } + const echoHorizonMs = gate.maxDelayMs * 10; + const newestWatermarkMs = Math.max( + ...staleRecords.map((record) => record.updatedAtMs ?? 0) + ); + const withinEchoHorizon = Date.now() - newestWatermarkMs < echoHorizonMs; + if (attempt < gate.staleRetries || withinEchoHorizon) { + const retryDelayMs = Math.max( + 25, + Math.min(gate.getLagMs() + gate.marginMs, gate.maxDelayMs) + ); + const timer = setTimeout(() => { + this.#onBatch(environmentId, env, staleRecords, attempt + 1).catch((error) => { + logger.error("[envChangeRouter] failed to re-hydrate stale rows", { + environmentId, + error, + }); + }); + }, retryDelayMs); + timer.unref?.(); + } + } + // 4. Assemble each feed's matched rows (post-filtering tag feeds against the // authoritative hydrated row) and resolve its pending wait. for (const [feed, runIds] of matchedRunIdsByFeed) { @@ -496,6 +620,9 @@ export class EnvChangeRouter { const rows: MatchedRow[] = []; for (const runId of runIds) { + if (staleRunIds.has(runId)) { + continue; // withheld; the scheduled re-hydrate delivers the fresh version + } const matched = hydrated.get(runId); if (!matched) continue; // run not found / left the table if (feed.filter.kind === "tag" && !this.#tagRowMatches(matched.row, feed.filter)) { @@ -512,6 +639,49 @@ export class EnvChangeRouter { } } + /** Runs whose hydrated row is provably behind its record's watermark (stale content), + * or absent entirely despite a watermark (the insert hasn't applied). Records without + * `updatedAtMs` can't be judged and always pass. */ + #detectStaleRuns( + records: ChangeRecord[], + runIdsByColumnSig: Map }>, + hydratedByColumnSig: Map> + ): Set { + const gate = this.options.replicaLag; + const stale = new Set(); + if (!gate) { + return stale; + } + const expectedByRunId = new Map(); + for (const record of records) { + if (record.updatedAtMs !== undefined) { + const existing = expectedByRunId.get(record.runId); + if (existing === undefined || record.updatedAtMs > existing) { + expectedByRunId.set(record.runId, record.updatedAtMs); + } + } + } + if (expectedByRunId.size === 0) { + return stale; + } + const now = Date.now(); + for (const [columnSig, group] of runIdsByColumnSig) { + const hydrated = hydratedByColumnSig.get(columnSig); + for (const runId of group.runIds) { + const expected = expectedByRunId.get(runId); + if (expected === undefined || stale.has(runId)) { + continue; + } + const matched = hydrated?.get(runId); + if (!matched || matched.row.updatedAt.getTime() < expected) { + stale.add(runId); + gate.noteObservedLagMs(now - expected); + } + } + } + return stale; + } + /** Authoritative re-check for tag feeds: the hydrated row carries ALL the filter's tags * (Electric's `runTags @> ARRAY[...]` semantics) and its createdAt is within the window. */ #tagRowMatches(row: RealtimeRunRow, filter: Extract): boolean { diff --git a/apps/webapp/app/services/realtime/nativeRealtimeClientInstance.server.ts b/apps/webapp/app/services/realtime/nativeRealtimeClientInstance.server.ts index c41149f0cc..012c28c08f 100644 --- a/apps/webapp/app/services/realtime/nativeRealtimeClientInstance.server.ts +++ b/apps/webapp/app/services/realtime/nativeRealtimeClientInstance.server.ts @@ -5,11 +5,12 @@ import { singleton } from "~/utils/singleton"; import { getCachedLimit } from "../platform.v3.server"; import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactoryInstance.server"; import { ClickHouseRunListResolver } from "./clickHouseRunListResolver.server"; -import { EnvChangeRouter } from "./envChangeRouter.server"; +import { EnvChangeRouter, type EnvChangeSource } from "./envChangeRouter.server"; import { NativeRealtimeClient } from "./nativeRealtimeClient.server"; import { RealtimeConcurrencyLimiter } from "./realtimeConcurrencyLimiter.server"; import { getRunChangeNotifier } from "./runChangeNotifierInstance.server"; import { RedisReplayCursorStore } from "./replayCursorStore.server"; +import { createPostgresReplicaLagSource, ReplicaLagEstimator } from "./replicaLagEstimator.server"; import { RunHydrator } from "./runReader.server"; // Process-singleton wiring for the native realtime client; only constructed when a @@ -83,6 +84,11 @@ function initializeNativeRealtimeClient(): NativeRealtimeClient { "Shared replay-cursor store operations by outcome. Errors degrade hops to cold resolves (watch live_polls{path='cold-resolve'} rise with them), never failed polls.", }); + const staleHydrates = meter.createCounter("realtime_native.stale_hydrates", { + description: + "Wake hydrates the read-your-writes tripwire caught reading behind the publish. 'recovered' = a retry delivered the fresh row; sustained 'gave_up' means replica lag is outrunning the retry budget.", + }); + const limiter = new RealtimeConcurrencyLimiter({ keyPrefix: "tr:realtime:native:concurrency", redis: { @@ -120,8 +126,37 @@ function initializeNativeRealtimeClient(): NativeRealtimeClient { maxCacheEntries: env.REALTIME_BACKEND_NATIVE_RUN_CACHE_MAX_ENTRIES, }); + // Read-your-writes gate: the estimator samples replica lag (reader-side only, paused + // when idle) and the router delays wake hydrates by it, anchored to each record's + // updatedAtMs — so a publish racing the replica's apply is waited out, not read stale. + const lagEstimator = + env.REALTIME_BACKEND_NATIVE_REPLICA_LAG_GATE_ENABLED === "1" + ? new ReplicaLagEstimator({ + source: createPostgresReplicaLagSource($replica), + sampleIntervalMs: env.REALTIME_BACKEND_NATIVE_REPLICA_LAG_SAMPLE_INTERVAL_MS, + idleAfterMs: env.REALTIME_BACKEND_NATIVE_REPLICA_LAG_IDLE_AFTER_MS, + windowMs: env.REALTIME_BACKEND_NATIVE_REPLICA_LAG_WINDOW_MS, + defaultLagMs: env.REALTIME_BACKEND_NATIVE_REPLICA_LAG_DEFAULT_MS, + observedFloorTtlMs: env.REALTIME_BACKEND_NATIVE_REPLICA_LAG_OBSERVED_FLOOR_TTL_MS, + }) + : undefined; + + // The notifier wrapped so router activity keeps the lag sampler warm. + const notifier = getRunChangeNotifier(); + const source: EnvChangeSource = lagEstimator + ? { + subscribeToEnv(environmentId, onBatch) { + lagEstimator.touch(); + return notifier.subscribeToEnv(environmentId, (records) => { + lagEstimator.touch(); + onBatch(records); + }); + }, + } + : notifier; + const router = new EnvChangeRouter({ - source: getRunChangeNotifier(), + source, hydrator: runReader, onHydrate: (runCount) => routerHydrates.add(runCount), replayWindowMs: env.REALTIME_BACKEND_NATIVE_REPLAY_WINDOW_MS, @@ -129,6 +164,16 @@ function initializeNativeRealtimeClient(): NativeRealtimeClient { unsubscribeLingerMs: env.REALTIME_BACKEND_NATIVE_UNSUBSCRIBE_LINGER_MS, onReplay: (result) => replays.add(1, { result }), onReplayEviction: (reason) => replayEvictions.add(1, { reason }), + replicaLag: lagEstimator + ? { + getLagMs: () => lagEstimator.getLagMs(), + noteObservedLagMs: (lagMs) => lagEstimator.noteObservedLagMs(lagMs), + marginMs: env.REALTIME_BACKEND_NATIVE_REPLICA_LAG_MARGIN_MS, + maxDelayMs: env.REALTIME_BACKEND_NATIVE_REPLICA_LAG_MAX_DELAY_MS, + staleRetries: env.REALTIME_BACKEND_NATIVE_STALE_HYDRATE_RETRIES, + onStaleHydrate: (outcome, runCount) => staleHydrates.add(runCount, { outcome }), + } + : undefined, }); const client = new NativeRealtimeClient({ @@ -208,6 +253,15 @@ function initializeNativeRealtimeClient(): NativeRealtimeClient { }) .addCallback((result) => result.observe(router.activeEnvCount)); + if (lagEstimator) { + meter + .createObservableGauge("realtime_native.replica_lag_estimate_ms", { + description: + "The read-your-writes gate's current replica-lag estimate (max sample in the window). Wake hydrates are delayed by roughly this much past each change's commit.", + }) + .addCallback((result) => result.observe(lagEstimator.getLagMs())); + } + return client; } diff --git a/apps/webapp/app/services/realtime/replicaLagEstimator.server.ts b/apps/webapp/app/services/realtime/replicaLagEstimator.server.ts new file mode 100644 index 0000000000..17aa7e2b89 --- /dev/null +++ b/apps/webapp/app/services/realtime/replicaLagEstimator.server.ts @@ -0,0 +1,257 @@ +import { logger } from "~/services/logger.server"; + +/** + * ReplicaLagEstimator — tracks how far the read replica trails the primary so the + * EnvChangeRouter can delay wake-path hydrates just long enough to read their own writes. + * Two inputs: a ReplicaLagSource (active, reader-side only — never queries the primary) + * sampled on an interval while the router is busy, and passive observations fed back by + * the router's stale-hydrate tripwire. The estimate is the max over a short window — + * floored by recent observations — so spikes widen the delay immediately and decay back + * out as fresh samples land. + */ + +type RawQueryable = { + $queryRawUnsafe(query: string): Promise; +}; + +/** A dialect-specific reader-side lag measure. `sampleLagMs()` returns the current lag in + * ms, or undefined when lag is genuinely unmeasurable right now (NOT an error — errors + * throw, and the composing source uses them to rule the dialect out). */ +export interface ReplicaLagSource { + readonly name: string; + sampleLagMs(): Promise; +} + +/** Aurora: replicas share the storage layer and reject every standard WAL function; + * `aurora_replica_status()` is the only live lag source. Max across readers, since the + * `$replica` pool balances over all of them. No reader rows = `$replica` is the writer = + * no lag. Throws on non-Aurora (the function doesn't exist). */ +export class AuroraReplicaLagSource implements ReplicaLagSource { + readonly name = "aurora"; + + constructor(private readonly db: RawQueryable) {} + + async sampleLagMs(): Promise { + const rows = await this.db.$queryRawUnsafe<{ lag: number | null }[]>( + `SELECT max(replica_lag_in_msec)::float8 AS lag FROM aurora_replica_status() WHERE session_id <> 'MASTER_SESSION_ID' AND replica_lag_in_msec IS NOT NULL` + ); + const lag = rows[0]?.lag; + return typeof lag === "number" && Number.isFinite(lag) ? Math.max(0, lag) : 0; + } +} + +/** Vanilla PG streaming replication. A primary (not in recovery — no replica configured, + * `$replica` is the writer) has no lag by definition; a caught-up replica (receive LSN == + * replay LSN) reports 0. Mid-apply there is NO honest reader-side timestamp measure — + * `now() - pg_last_xact_replay_timestamp()` reads as the full inter-write gap on + * low-traffic systems, which (measured locally) pins the estimate at the delay cap — so + * mid-apply reports undefined and the tripwire's observed-staleness floor carries the + * estimate instead. */ +export class VanillaPgReplicaLagSource implements ReplicaLagSource { + readonly name = "vanilla-pg"; + + constructor(private readonly db: RawQueryable) {} + + async sampleLagMs(): Promise { + const rows = await this.db.$queryRawUnsafe<{ caught_up: boolean | null }[]>( + `SELECT CASE + WHEN NOT pg_is_in_recovery() THEN true + WHEN pg_last_wal_receive_lsn() IS NOT DISTINCT FROM pg_last_wal_replay_lsn() THEN true + ELSE false + END AS caught_up` + ); + return rows[0]?.caught_up ? 0 : undefined; + } +} + +/** Composes dialect sources: the first whose sample succeeds is selected and used from + * then on; a database where none work degrades to never-measuring (the estimator then + * runs on its default + tripwire observations). Selection is by thrown-vs-returned — + * sources throw on unsupported dialects and return undefined for "can't measure now". */ +export class FirstSupportedReplicaLagSource implements ReplicaLagSource { + /** undefined = not probed yet; null = no candidate works here. */ + #selected: ReplicaLagSource | null | undefined; + + constructor(private readonly candidates: ReplicaLagSource[]) {} + + get name(): string { + return this.#selected ? this.#selected.name : "undetected"; + } + + async sampleLagMs(): Promise { + if (this.#selected === null) { + return undefined; + } + if (this.#selected) { + // Transient errors don't unselect the dialect; the sample is just skipped. + try { + return await this.#selected.sampleLagMs(); + } catch { + return undefined; + } + } + for (const candidate of this.candidates) { + try { + const lag = await candidate.sampleLagMs(); + this.#selected = candidate; + logger.info("[replicaLagEstimator] selected lag source", { source: candidate.name }); + return lag; + } catch { + // unsupported dialect; try the next + } + } + this.#selected = null; + logger.warn( + "[replicaLagEstimator] no usable lag source; relying on default + tripwire observations" + ); + return undefined; + } +} + +/** The standard composition for a Prisma replica client. */ +export function createPostgresReplicaLagSource(replica: RawQueryable): ReplicaLagSource { + return new FirstSupportedReplicaLagSource([ + new AuroraReplicaLagSource(replica), + new VanillaPgReplicaLagSource(replica), + ]); +} + +export type ReplicaLagEstimatorOptions = { + source: ReplicaLagSource; + /** Sample cadence while active. */ + sampleIntervalMs?: number; + /** Stop sampling this long after the last touch(); the next touch resumes. */ + idleAfterMs?: number; + /** The estimate is the max sample inside this window. */ + windowMs?: number; + /** Estimate before any sample lands (and the floor when sampling is unavailable). */ + defaultLagMs?: number; + /** Ceiling on accepted samples — shields the estimate from a wild observation. */ + maxLagMs?: number; + /** How long a tripwire observation floors the estimate. Sources that can't measure + * mid-apply lag (vanilla PG) return nothing, so without this floor the estimate decays + * to the caught-up zeros within windowMs and every ~window one wake pays a stale retry + * to re-learn. */ + observedFloorTtlMs?: number; + /** Observability: a sample (active or passive) was accepted. */ + onSample?: (lagMs: number, source: "probe" | "observed") => void; +}; + +const DEFAULT_SAMPLE_INTERVAL_MS = 250; +const DEFAULT_IDLE_AFTER_MS = 30_000; +const DEFAULT_WINDOW_MS = 5_000; +const DEFAULT_DEFAULT_LAG_MS = 30; +const DEFAULT_MAX_LAG_MS = 60_000; +const DEFAULT_OBSERVED_FLOOR_TTL_MS = 60_000; + +export class ReplicaLagEstimator { + readonly #sampleIntervalMs: number; + readonly #idleAfterMs: number; + readonly #windowMs: number; + readonly #defaultLagMs: number; + readonly #maxLagMs: number; + readonly #observedFloorTtlMs: number; + #samples: { atMs: number; lagMs: number }[] = []; + #lastKnownLagMs: number | undefined; + #observedFloorLagMs = 0; + #observedFloorAtMs = 0; + #lastTouchMs = 0; + #timer: ReturnType | undefined; + #sampling = false; + + constructor(private readonly options: ReplicaLagEstimatorOptions) { + this.#sampleIntervalMs = options.sampleIntervalMs ?? DEFAULT_SAMPLE_INTERVAL_MS; + this.#idleAfterMs = options.idleAfterMs ?? DEFAULT_IDLE_AFTER_MS; + this.#windowMs = options.windowMs ?? DEFAULT_WINDOW_MS; + this.#defaultLagMs = options.defaultLagMs ?? DEFAULT_DEFAULT_LAG_MS; + this.#maxLagMs = options.maxLagMs ?? DEFAULT_MAX_LAG_MS; + this.#observedFloorTtlMs = options.observedFloorTtlMs ?? DEFAULT_OBSERVED_FLOOR_TTL_MS; + } + + /** Mark router activity; starts (or keeps) the sampler running. */ + touch() { + this.#lastTouchMs = Date.now(); + if (!this.#timer) { + this.#timer = setInterval(() => this.#tick(), this.#sampleIntervalMs); + this.#timer.unref?.(); + // Sample immediately so the first wake after idle doesn't run on a stale estimate. + this.#tick(); + } + } + + /** Current lag estimate (ms): the max recent sample (else last known, else the default), + * floored by the latest tripwire observation while it's fresh. Never throws. */ + getLagMs(): number { + const now = Date.now(); + const cutoff = now - this.#windowMs; + let max: number | undefined; + for (const sample of this.#samples) { + if (sample.atMs >= cutoff && (max === undefined || sample.lagMs > max)) { + max = sample.lagMs; + } + } + const base = max ?? this.#lastKnownLagMs ?? this.#defaultLagMs; + const floor = + now - this.#observedFloorAtMs < this.#observedFloorTtlMs ? this.#observedFloorLagMs : 0; + return Math.max(base, floor); + } + + /** Feedback from the stale-hydrate tripwire: a read provably ran at least this far + * behind the primary. Widens the estimate immediately AND floors it for a while — + * sources that can't measure mid-apply lag would otherwise decay it straight back. */ + noteObservedLagMs(lagMs: number) { + const clamped = Math.min(Math.max(0, lagMs), this.#maxLagMs); + const floorExpired = Date.now() - this.#observedFloorAtMs >= this.#observedFloorTtlMs; + if (clamped >= this.#observedFloorLagMs || floorExpired) { + this.#observedFloorLagMs = clamped; + this.#observedFloorAtMs = Date.now(); + } + this.#accept(lagMs, "observed"); + } + + stop() { + if (this.#timer) { + clearInterval(this.#timer); + this.#timer = undefined; + } + } + + #accept(lagMs: number, source: "probe" | "observed") { + if (!Number.isFinite(lagMs)) { + return; + } + const clamped = Math.min(Math.max(0, lagMs), this.#maxLagMs); + const now = Date.now(); + this.#samples.push({ atMs: now, lagMs: clamped }); + this.#lastKnownLagMs = clamped; + const cutoff = now - this.#windowMs; + while (this.#samples.length > 0 && this.#samples[0].atMs < cutoff) { + this.#samples.shift(); + } + this.options.onSample?.(clamped, source); + } + + #tick() { + if (Date.now() - this.#lastTouchMs > this.#idleAfterMs) { + this.stop(); + return; + } + if (this.#sampling) { + return; // a slow sample shouldn't stack + } + this.#sampling = true; + this.options.source + .sampleLagMs() + .then((lagMs) => { + if (lagMs !== undefined) { + this.#accept(lagMs, "probe"); + } + }) + .catch(() => { + // sampling errors never propagate; the estimate just ages + }) + .finally(() => { + this.#sampling = false; + }); + } +} diff --git a/apps/webapp/app/v3/runEngineHandlers.server.ts b/apps/webapp/app/v3/runEngineHandlers.server.ts index c8d9240154..082974af38 100644 --- a/apps/webapp/app/v3/runEngineHandlers.server.ts +++ b/apps/webapp/app/v3/runEngineHandlers.server.ts @@ -68,6 +68,7 @@ export function registerRunEngineEventBusHandlers() { envId: environment.id, tags: taskRun.runTags, batchId: taskRun.batchId, + updatedAtMs: run.updatedAt.getTime(), }); const eventRepository = await getEventRepositoryForStore( @@ -149,6 +150,7 @@ export function registerRunEngineEventBusHandlers() { envId: environment.id, tags: taskRun.runTags, batchId: taskRun.batchId, + updatedAtMs: run.updatedAt.getTime(), }); const eventRepository = await getEventRepositoryForStore( @@ -217,6 +219,7 @@ export function registerRunEngineEventBusHandlers() { envId: taskRun.runtimeEnvironmentId, tags: taskRun.runTags, batchId: taskRun.batchId, + updatedAtMs: run.updatedAt.getTime(), }); if (!taskRun.organizationId) { @@ -409,6 +412,7 @@ export function registerRunEngineEventBusHandlers() { envId: environment.id, tags: taskRun.runTags, batchId: taskRun.batchId, + updatedAtMs: run.updatedAt.getTime(), }); const eventRepository = await getEventRepositoryForStore( @@ -474,6 +478,7 @@ export function registerRunEngineEventBusHandlers() { envId: environment.id, tags: taskRun.runTags, batchId: taskRun.batchId, + updatedAtMs: run.updatedAt.getTime(), }); const eventRepository = await getEventRepositoryForStore( @@ -572,10 +577,19 @@ export function registerRunEngineEventBusHandlers() { const { environment, runTags, batchId } = result; try { - await updateMetadataService.call(run.id, run.metadata, environment); + const updateResult = await updateMetadataService.call(run.id, run.metadata, environment); // Realtime run-changed publish, after the write so the router's hydrate sees the new - // row. A full record (env + tags + batchId), so feeds route by index. - publishChangeRecord({ runId: run.id, envId: environment.id, tags: runTags, batchId }); + // row. A full record (env + tags + batchId + the committed updatedAt watermark), so + // feeds route by index. Nothing written here (no-op or buffered) = nothing to announce. + if (updateResult?.updatedAtMs !== undefined) { + publishChangeRecord({ + runId: run.id, + envId: environment.id, + tags: runTags, + batchId, + updatedAtMs: updateResult.updatedAtMs, + }); + } } catch (e) { if (e instanceof MetadataTooLargeError) { logger.warn("[runMetadataUpdated] Failed to update metadata, too large", { diff --git a/apps/webapp/test/realtime/envChangeRouter.test.ts b/apps/webapp/test/realtime/envChangeRouter.test.ts index 6f2eb6df98..022b5827cd 100644 --- a/apps/webapp/test/realtime/envChangeRouter.test.ts +++ b/apps/webapp/test/realtime/envChangeRouter.test.ts @@ -361,3 +361,190 @@ describe("EnvChangeRouter", () => { reg.close(); }); }); + +describe("EnvChangeRouter read-your-writes gate", () => { + function gate(overrides: Record = {}) { + const observed: number[] = []; + const outcomes: { outcome: string; runCount: number }[] = []; + return { + observed, + outcomes, + replicaLag: { + getLagMs: () => 0, + noteObservedLagMs: (ms: number) => observed.push(ms), + marginMs: 0, + maxDelayMs: 1_000, + staleRetries: 3, + onStaleHydrate: (outcome: string, runCount: number) => + outcomes.push({ outcome, runCount }), + ...overrides, + }, + }; + } + + it("delays the wake hydrate by lag+margin anchored to the record's updatedAtMs", async () => { + const rows = new Map([["r1", row("r1", { tags: ["a"] })]]); + const g = gate({ getLagMs: () => 80, marginMs: 10 }); + const { router, src, hydrateSpy } = makeRouter(rows, { replicaLag: g.replicaLag }); + const reg = router.register("env_1", { kind: "tag", tags: ["a"] }, []); + const wait = reg.waitForMatch(undefined, 2_000); + + const pushedAt = Date.now(); + src.push("env_1", [record("r1", { tags: ["a"], updatedAtMs: pushedAt })]); + + // The hydrate must wait out the lag window (~90ms), not run immediately. + await new Promise((r) => setTimeout(r, 30)); + expect(hydrateSpy).not.toHaveBeenCalled(); + + const result = await wait; + expect(result.reason).toBe("notify"); + expect(Date.now() - pushedAt).toBeGreaterThanOrEqual(80); + reg.close(); + }); + + it("does not delay when the record's anchor is already past the lag window", async () => { + // The row's updatedAt matches the watermark so the tripwire stays quiet. + const anchorMs = Date.now() - 5_000; + const rows = new Map([["r1", row("r1", { tags: ["a"], updatedAtMs: anchorMs })]]); + const g = gate({ getLagMs: () => 80, marginMs: 10 }); + const { router, src } = makeRouter(rows, { replicaLag: g.replicaLag }); + const reg = router.register("env_1", { kind: "tag", tags: ["a"] }, []); + const wait = reg.waitForMatch(undefined, 2_000); + + const pushedAt = Date.now(); + src.push("env_1", [record("r1", { tags: ["a"], updatedAtMs: anchorMs })]); + + const result = await wait; + expect(result.reason).toBe("notify"); + expect(Date.now() - pushedAt).toBeLessThan(60); + reg.close(); + }); + + it("withholds a stale row, re-hydrates, and delivers only the fresh version", async () => { + const watermark = FLOOR_MS + 10_000; + const staleRow = row("r1", { tags: ["a"], updatedAtMs: FLOOR_MS + 5_000 }); + const freshRow = row("r1", { tags: ["a"], updatedAtMs: watermark }); + const hydrateSpy = vi + .fn() + .mockResolvedValueOnce([staleRow]) + .mockResolvedValue([freshRow]); + const src = fakeSource(); + const g = gate(); + const router = new EnvChangeRouter({ + source: src.source, + hydrator: { hydrateByIds: hydrateSpy }, + replicaLag: g.replicaLag, + }); + const reg = router.register("env_1", { kind: "tag", tags: ["a"] }, []); + const wait = reg.waitForMatch(undefined, 2_000); + + src.push("env_1", [record("r1", { tags: ["a"], updatedAtMs: watermark })]); + + const result = await wait; + expect(result.reason).toBe("notify"); + expect(result.rows[0].row.updatedAt.getTime()).toBe(watermark); + expect(hydrateSpy).toHaveBeenCalledTimes(2); + expect(g.observed.length).toBe(1); // the stale read fed the estimator + expect(g.outcomes).toEqual([{ outcome: "recovered", runCount: 1 }]); + reg.close(); + }); + + it("a missing row with a watermark (insert race) retries and delivers when it appears", async () => { + const watermark = FLOOR_MS + 10_000; + const freshRow = row("r1", { tags: ["a"], updatedAtMs: watermark }); + const hydrateSpy = vi + .fn() + .mockResolvedValueOnce([]) + .mockResolvedValue([freshRow]); + const src = fakeSource(); + const g = gate(); + const router = new EnvChangeRouter({ + source: src.source, + hydrator: { hydrateByIds: hydrateSpy }, + replicaLag: g.replicaLag, + }); + const reg = router.register("env_1", { kind: "tag", tags: ["a"] }, []); + const wait = reg.waitForMatch(undefined, 2_000); + + src.push("env_1", [record("r1", { tags: ["a"], updatedAtMs: watermark })]); + + const result = await wait; + expect(result.rows[0].row.id).toBe("r1"); + expect(hydrateSpy).toHaveBeenCalledTimes(2); + reg.close(); + }); + + it("delivers the stale row after exhausting retries (liveness over freshness)", async () => { + const watermark = FLOOR_MS + 10_000; + const staleRow = row("r1", { tags: ["a"], updatedAtMs: FLOOR_MS + 5_000 }); + const hydrateSpy = vi.fn().mockResolvedValue([staleRow]); + const src = fakeSource(); + const g = gate({ staleRetries: 1 }); + const router = new EnvChangeRouter({ + source: src.source, + hydrator: { hydrateByIds: hydrateSpy }, + replicaLag: g.replicaLag, + }); + const reg = router.register("env_1", { kind: "tag", tags: ["a"] }, []); + const wait = reg.waitForMatch(undefined, 2_000); + + src.push("env_1", [record("r1", { tags: ["a"], updatedAtMs: watermark })]); + + const result = await wait; + expect(result.reason).toBe("notify"); + expect(result.rows[0].row.updatedAt.getTime()).toBe(FLOOR_MS + 5_000); + expect(hydrateSpy).toHaveBeenCalledTimes(2); // first pass + 1 retry + expect(g.outcomes).toEqual([{ outcome: "gave_up", runCount: 1 }]); + reg.close(); + }); + + it("after giving up, echo passes deliver the fresh row once the replica catches up", async () => { + // Recent watermark (inside the echo horizon); retries exhausted immediately. + const watermark = Date.now() - 50; + const staleRow = row("r1", { tags: ["a"], updatedAtMs: watermark - 5_000 }); + const freshRow = row("r1", { tags: ["a"], updatedAtMs: watermark }); + const hydrateSpy = vi + .fn() + .mockResolvedValueOnce([staleRow]) // first pass: stale -> gave_up, delivered anyway + .mockResolvedValue([freshRow]); // echo pass: fresh + const src = fakeSource(); + const g = gate({ staleRetries: 0 }); + const router = new EnvChangeRouter({ + source: src.source, + hydrator: { hydrateByIds: hydrateSpy }, + replicaLag: g.replicaLag, + }); + const reg = router.register("env_1", { kind: "tag", tags: ["a"] }, []); + + const first = reg.waitForMatch(undefined, 2_000); + src.push("env_1", [record("r1", { tags: ["a"], updatedAtMs: watermark })]); + const staleDelivery = await first; + expect(staleDelivery.rows[0].row.updatedAt.getTime()).toBe(watermark - 5_000); + expect(g.outcomes).toEqual([{ outcome: "gave_up", runCount: 1 }]); + + // The client re-arms (as it would after consuming the stale emission); the echo + // re-hydrate delivers the fresh version through the normal pipeline. + const echoed = await reg.waitForMatch(undefined, 2_000); + expect(echoed.reason).toBe("notify"); + expect(echoed.rows[0].row.updatedAt.getTime()).toBe(watermark); + reg.close(); + }); + + it("records without a watermark bypass the tripwire entirely", async () => { + const staleLooking = row("r1", { tags: ["a"], updatedAtMs: FLOOR_MS + 5_000 }); + const rows = new Map([["r1", staleLooking]]); + const g = gate(); + const { router, src, hydrateSpy } = makeRouter(rows, { replicaLag: g.replicaLag }); + const reg = router.register("env_1", { kind: "tag", tags: ["a"] }, []); + const wait = reg.waitForMatch(undefined, 2_000); + + src.push("env_1", [record("r1", { tags: ["a"] })]); // no updatedAtMs + + const result = await wait; + expect(result.reason).toBe("notify"); + expect(hydrateSpy).toHaveBeenCalledTimes(1); + expect(g.outcomes).toEqual([]); + expect(g.observed).toEqual([]); + reg.close(); + }); +}); diff --git a/apps/webapp/test/realtime/replicaLagEstimator.test.ts b/apps/webapp/test/realtime/replicaLagEstimator.test.ts new file mode 100644 index 0000000000..7666ea28ef --- /dev/null +++ b/apps/webapp/test/realtime/replicaLagEstimator.test.ts @@ -0,0 +1,164 @@ +import { afterEach, describe, expect, it } from "vitest"; +import { + FirstSupportedReplicaLagSource, + ReplicaLagEstimator, + type ReplicaLagSource, +} from "~/services/realtime/replicaLagEstimator.server"; + +const sleep = (ms: number) => new Promise((r) => setTimeout(r, ms)); + +function source(sampleLagMs: () => Promise, name = "fake"): ReplicaLagSource { + return { name, sampleLagMs }; +} + +describe("ReplicaLagEstimator", () => { + let estimator: ReplicaLagEstimator | undefined; + + afterEach(() => { + estimator?.stop(); + estimator = undefined; + }); + + it("returns the default before any sample lands", () => { + estimator = new ReplicaLagEstimator({ + source: source(async () => undefined), + defaultLagMs: 42, + }); + expect(estimator.getLagMs()).toBe(42); + }); + + it("samples the source while touched and reports the window max", async () => { + const samples = [10, 60, 20]; + let i = 0; + estimator = new ReplicaLagEstimator({ + source: source(async () => samples[Math.min(i++, samples.length - 1)]), + sampleIntervalMs: 10, + windowMs: 5_000, + defaultLagMs: 0, + }); + estimator.touch(); + await sleep(60); + // The max sample (60) dominates even after smaller ones land. + expect(estimator.getLagMs()).toBe(60); + }); + + it("widens immediately on an observed (tripwire) lag and clamps wild values", () => { + estimator = new ReplicaLagEstimator({ + source: source(async () => 5), + defaultLagMs: 5, + maxLagMs: 1_000, + }); + estimator.noteObservedLagMs(250); + expect(estimator.getLagMs()).toBe(250); + estimator.noteObservedLagMs(99_999); + expect(estimator.getLagMs()).toBe(1_000); + }); + + it("an observed lag floors the estimate past the sample window (until its TTL)", async () => { + estimator = new ReplicaLagEstimator({ + source: source(async () => 0), // caught-up zeros, like vanilla PG between writes + sampleIntervalMs: 10, + windowMs: 30, + defaultLagMs: 0, + observedFloorTtlMs: 10_000, + }); + estimator.touch(); + estimator.noteObservedLagMs(150); + // Long past windowMs the zeros have flushed the observation out of the window, + // but the floor still carries it. + await sleep(80); + expect(estimator.getLagMs()).toBe(150); + }); + + it("stops sampling once idle and resumes on touch", async () => { + let probes = 0; + estimator = new ReplicaLagEstimator({ + source: source(async () => { + probes++; + return 1; + }), + sampleIntervalMs: 10, + idleAfterMs: 20, + }); + estimator.touch(); + await sleep(80); + const afterIdle = probes; + expect(afterIdle).toBeGreaterThan(0); + await sleep(40); + // No new samples while idle... + expect(probes).toBe(afterIdle); + // ...and touching resumes immediately. + estimator.touch(); + await sleep(15); + expect(probes).toBeGreaterThan(afterIdle); + }); + + it("survives a throwing source and keeps the last known value", async () => { + let fail = false; + estimator = new ReplicaLagEstimator({ + source: source(async () => { + if (fail) throw new Error("source down"); + return 33; + }), + sampleIntervalMs: 10, + windowMs: 30, + defaultLagMs: 0, + }); + estimator.touch(); + await sleep(25); + expect(estimator.getLagMs()).toBe(33); + fail = true; + await sleep(60); + // Window emptied, source failing — falls back to the last known sample. + expect(estimator.getLagMs()).toBe(33); + }); +}); + +describe("FirstSupportedReplicaLagSource", () => { + it("selects the first candidate whose sample succeeds and sticks with it", async () => { + let auroraCalls = 0; + let vanillaCalls = 0; + const composed = new FirstSupportedReplicaLagSource([ + source(async () => { + auroraCalls++; + throw new Error("function aurora_replica_status() does not exist"); + }, "aurora"), + source(async () => { + vanillaCalls++; + return 7; + }, "vanilla-pg"), + ]); + expect(composed.name).toBe("undetected"); + expect(await composed.sampleLagMs()).toBe(7); + expect(composed.name).toBe("vanilla-pg"); + expect(await composed.sampleLagMs()).toBe(7); + // The unsupported dialect was only probed during selection. + expect(auroraCalls).toBe(1); + expect(vanillaCalls).toBe(2); + }); + + it("degrades to never-measuring when no candidate works", async () => { + const composed = new FirstSupportedReplicaLagSource([ + source(async () => { + throw new Error("nope"); + }), + ]); + expect(await composed.sampleLagMs()).toBeUndefined(); + expect(await composed.sampleLagMs()).toBeUndefined(); + }); + + it("a transient error after selection skips the sample without unselecting", async () => { + let calls = 0; + const composed = new FirstSupportedReplicaLagSource([ + source(async () => { + calls++; + if (calls === 2) throw new Error("transient"); + return 11; + }, "flaky"), + ]); + expect(await composed.sampleLagMs()).toBe(11); + expect(await composed.sampleLagMs()).toBeUndefined(); // transient error -> skipped sample + expect(await composed.sampleLagMs()).toBe(11); // still selected + expect(composed.name).toBe("flaky"); + }); +}); diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 4cebfb35cd..1e70d94cd0 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -11,6 +11,7 @@ name: triggerdotdev-docker volumes: database-data: database-data-alt: + database-replica-data: redis-data: minio-data: clickhouse-data: @@ -44,6 +45,55 @@ services: - wal_level=logical - -c - shared_preload_libraries=pg_partman_bgw + # The webapp opens ~50 pooled connections per instance and Electric another + # ~40, so the default 100 is exhausted by one webapp + Electric alone. Raise + # it so multiple instances / load tests have headroom. + - -c + - max_connections=500 + + # Opt-in streaming read replica with configurable apply lag — a dial-a-lag rig for + # testing replica-race behavior (e.g. the realtime read-your-writes gate) locally. + # Start with: COMPOSE_PROFILES=replica pnpm run docker + # One-time primary prep (allows replication connections; additive, survives restarts): + # docker exec database bash -c 'grep -q "host replication" "$PGDATA/pg_hba.conf" || echo "host replication all all md5" >> "$PGDATA/pg_hba.conf"' + # docker exec database psql -U postgres -c "SELECT pg_reload_conf()" + # Then point the webapp at it: DATABASE_READ_REPLICA_URL=postgresql://postgres:postgres@localhost:5433/postgres + # Tune the lag via REPLICA_APPLY_DELAY (e.g. 150ms, 2s). Wipe database-replica-data to re-init. + database-replica: + container_name: ${CONTAINER_PREFIX:-}database-replica + profiles: ["replica"] + build: + context: . + dockerfile: Dockerfile.postgres + restart: always + depends_on: + - database + volumes: + - ${DB_REPLICA_VOLUME:-database-replica-data}:/var/lib/postgresql/data/ + environment: + PGPASSWORD: postgres + REPLICA_APPLY_DELAY: ${REPLICA_APPLY_DELAY:-150ms} + networks: + - app_network + ports: + - "${POSTGRES_REPLICA_HOST_PORT:-5433}:5432" + entrypoint: ["bash", "-c"] + command: + - | + set -e + if [ ! -s "$$PGDATA/PG_VERSION" ]; then + echo "initializing streaming replica from 'database'..." + mkdir -p "$$PGDATA" + chown postgres:postgres "$$PGDATA" + chmod 0700 "$$PGDATA" + until gosu postgres pg_basebackup -h database -U postgres -D "$$PGDATA" -Fp -Xs -R; do + echo "primary not ready for replication (did you run the one-time pg_hba prep above?); retrying..." + rm -rf "$$PGDATA"/* 2>/dev/null || true + sleep 2 + done + fi + # max_connections must be >= the primary's (hot-standby requirement). + exec docker-entrypoint.sh postgres -c hot_standby=on -c max_connections=500 -c "recovery_min_apply_delay=$$REPLICA_APPLY_DELAY" redis: container_name: ${CONTAINER_PREFIX:-}redis