Skip to content

Commit 934be00

Browse files
committed
fix(webapp): deliver realtime changes with current content when the read replica lags
The realtime feed hydrates change rows from the read replica, and that read can race the replica's apply of the very write that triggered it. Subscribers then receive the previous change's content, and an isolated final change is only corrected by the backstop poll. Publishers now stamp change records with the committed row's updatedAt (taken from writes they already perform, no extra queries), the router waits out the measured replica lag before hydrating, and a tripwire retries stale reads, feeding observations back into the lag estimate. Exhausted retries deliver anyway (liveness over freshness) while echo re-hydrates emit the fresh row through the normal diff once the replica catches up. Also: metadata updates that write nothing no longer publish a change record, and buffered parent/root metadata operations now publish when the flusher writes them.
1 parent cc9eabd commit 934be00

12 files changed

Lines changed: 987 additions & 31 deletions
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: fix
4+
---
5+
6+
Realtime feed reads now wait out measured read-replica lag and retry stale reads, so subscribers receive each change's current content instead of trailing one change behind when a read replica races the write.

apps/webapp/app/env.server.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,28 @@ const EnvironmentSchema = z
338338
// TTL/size of the per-org realtimeBackend flag cache used to pick the serving backend.
339339
REALTIME_BACKEND_FLAG_CACHE_TTL_MS: z.coerce.number().int().default(30_000),
340340
REALTIME_BACKEND_FLAG_CACHE_MAX_ENTRIES: z.coerce.number().int().default(50_000),
341+
// "1" enables the read-your-writes gate: wake hydrates wait out the measured replica lag
342+
// (anchored to the change record's updatedAtMs) and stale reads are retried.
343+
REALTIME_BACKEND_NATIVE_REPLICA_LAG_GATE_ENABLED: z.string().default("1"),
344+
// Reader-side lag probe cadence while the router is active; probing pauses when idle.
345+
REALTIME_BACKEND_NATIVE_REPLICA_LAG_SAMPLE_INTERVAL_MS: z.coerce.number().int().default(250),
346+
REALTIME_BACKEND_NATIVE_REPLICA_LAG_IDLE_AFTER_MS: z.coerce.number().int().default(30_000),
347+
// The lag estimate is the max sample inside this window (spikes widen it immediately).
348+
REALTIME_BACKEND_NATIVE_REPLICA_LAG_WINDOW_MS: z.coerce.number().int().default(5_000),
349+
// Estimate before the first sample lands (and the floor when probing is unavailable).
350+
REALTIME_BACKEND_NATIVE_REPLICA_LAG_DEFAULT_MS: z.coerce.number().int().default(30),
351+
// Safety margin (clock skew + scheduling) added on top of the lag estimate.
352+
REALTIME_BACKEND_NATIVE_REPLICA_LAG_MARGIN_MS: z.coerce.number().int().default(10),
353+
// Hard cap on any single gate delay — a sick replica degrades freshness, never liveness.
354+
REALTIME_BACKEND_NATIVE_REPLICA_LAG_MAX_DELAY_MS: z.coerce.number().int().default(1_000),
355+
// Re-hydrate attempts for rows the tripwire still finds stale after the delay.
356+
REALTIME_BACKEND_NATIVE_STALE_HYDRATE_RETRIES: z.coerce.number().int().default(3),
357+
// How long a tripwire-observed staleness floors the lag estimate (vanilla-PG replicas
358+
// can't measure mid-apply lag, so observations carry the estimate between races).
359+
REALTIME_BACKEND_NATIVE_REPLICA_LAG_OBSERVED_FLOOR_TTL_MS: z.coerce
360+
.number()
361+
.int()
362+
.default(60_000),
341363

342364
PUBSUB_REDIS_HOST: z
343365
.string()

apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,20 @@ export async function routeOperationsToRun(
100100
const [error, result] = await tryCatch(
101101
updateMetadataService.call(targetRunId, { operations }, env)
102102
);
103-
if (!error && result !== undefined) return;
103+
if (!error && result !== undefined) {
104+
// The parent/root run changed too — wake its live feeds (only when something was
105+
// actually written here; buffered writes publish from the flusher).
106+
if (result.updatedAtMs !== undefined) {
107+
publishChangeRecord({
108+
runId: result.runId,
109+
envId: env.id,
110+
tags: result.runTags,
111+
batchId: result.batchId,
112+
updatedAtMs: result.updatedAtMs,
113+
});
114+
}
115+
return;
116+
}
104117

105118
if (error) {
106119
// PG threw — auxiliary op, stay best-effort and don't surface this
@@ -186,13 +199,18 @@ const { action } = createActionApiRoute(
186199
}
187200
if (pgResult) {
188201
// Reflect metadata.set() on a live feed before the next lifecycle event. Publish the
189-
// internal id (the router keys single-run feeds by it, not the friendly id from the URL).
190-
publishChangeRecord({
191-
runId: pgResult.runId,
192-
envId: env.id,
193-
tags: pgResult.runTags,
194-
batchId: pgResult.batchId,
195-
});
202+
// internal id (the router keys single-run feeds by it, not the friendly id from the
203+
// URL) with the committed updatedAt as the read-your-writes watermark. No write
204+
// (no-op body, or ops buffered for the flusher) means nothing to announce here.
205+
if (pgResult.updatedAtMs !== undefined) {
206+
publishChangeRecord({
207+
runId: pgResult.runId,
208+
envId: env.id,
209+
tags: pgResult.runTags,
210+
batchId: pgResult.batchId,
211+
updatedAtMs: pgResult.updatedAtMs,
212+
});
213+
}
196214
return json({ metadata: pgResult.metadata }, { status: 200 });
197215
}
198216

apps/webapp/app/routes/api.v1.runs.$runId.tags.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,20 +84,22 @@ export async function action({ request, params }: ActionFunctionArgs) {
8484
if (newTags.length === 0) {
8585
return json({ message: "No new tags to add" }, { status: 200 });
8686
}
87-
await prisma.taskRun.update({
87+
const updated = await prisma.taskRun.update({
8888
where: {
8989
id: taskRun.id,
9090
runtimeEnvironmentId: env.id,
9191
},
9292
data: { runTags: { push: newTags } },
93+
select: { updatedAt: true },
9394
});
9495
// Publish a run-changed record with the NEW tag set so tag feeds reindex
95-
// (no-op unless enabled).
96+
// (no-op unless enabled). updatedAt is the read-your-writes watermark.
9697
publishChangeRecord({
9798
runId: taskRun.id,
9899
envId: env.id,
99100
tags: existing.concat(newTags),
100101
batchId: taskRun.batchId,
102+
updatedAtMs: updated.updatedAt.getTime(),
101103
});
102104
return json({ message: `Successfully set ${newTags.length} new tags.` }, { status: 200 });
103105
},

apps/webapp/app/services/metadata/updateMetadata.server.ts

Lines changed: 65 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,16 @@ export type UpdateMetadataServiceOptions = {
3030
maximumSize?: number;
3131
logger?: Logger;
3232
logLevel?: LogLevel;
33+
/** Called after the batched flusher writes a run's buffered operations, with everything
34+
* a realtime change record needs — buffered (parent/root) updates otherwise never wake
35+
* live feeds. */
36+
onRunFlushed?: (run: {
37+
runId: string;
38+
environmentId: string;
39+
tags: string[];
40+
batchId: string | null;
41+
updatedAtMs: number;
42+
}) => void;
3343
// Testing hooks
3444
onBeforeUpdate?: (runId: string) => Promise<void>;
3545
onAfterRead?: (runId: string, metadataVersion: number) => Promise<void>;
@@ -172,12 +182,20 @@ export class UpdateMetadataService {
172182
operations: BufferedRunMetadataChangeOperation[]
173183
) => {
174184
return Effect.gen(this, function* (_) {
175-
// Fetch current run
185+
// Fetch current run (+ the realtime membership keys, so a flush can publish)
176186
const run = yield* _(
177187
Effect.tryPromise(() =>
178188
this._prisma.taskRun.findFirst({
179189
where: { id: runId },
180-
select: { id: true, metadata: true, metadataType: true, metadataVersion: true },
190+
select: {
191+
id: true,
192+
metadata: true,
193+
metadataType: true,
194+
metadataVersion: true,
195+
runtimeEnvironmentId: true,
196+
runTags: true,
197+
batchId: true,
198+
},
181199
})
182200
)
183201
);
@@ -237,6 +255,9 @@ export class UpdateMetadataService {
237255
yield* _(Effect.tryPromise(() => this.options.onBeforeUpdate!(runId)));
238256
}
239257

258+
// Stamp updatedAt explicitly so the realtime publish can carry the exact committed
259+
// value without a follow-up read (updateMany can't RETURNING).
260+
const writeTime = new Date();
240261
const result = yield* _(
241262
Effect.tryPromise(() =>
242263
this._prisma.taskRun.updateMany({
@@ -247,6 +268,7 @@ export class UpdateMetadataService {
247268
data: {
248269
metadata: newMetadataPacket.data,
249270
metadataVersion: { increment: 1 },
271+
updatedAt: writeTime,
250272
},
251273
})
252274
)
@@ -262,6 +284,16 @@ export class UpdateMetadataService {
262284
return yield* _(Effect.fail(new Error("Optimistic lock failed")));
263285
}
264286

287+
yield* Effect.sync(() => {
288+
this.options.onRunFlushed?.({
289+
runId,
290+
environmentId: run.runtimeEnvironmentId,
291+
tags: run.runTags,
292+
batchId: run.batchId,
293+
updatedAtMs: writeTime.getTime(),
294+
});
295+
});
296+
265297
return result;
266298
});
267299
};
@@ -346,7 +378,7 @@ export class UpdateMetadataService {
346378
this.#ingestRunOperations(taskRun.rootTaskRun?.id ?? taskRun.id, body.rootOperations);
347379
}
348380

349-
const newMetadata = await this.#updateRunMetadata({
381+
const result = await this.#updateRunMetadata({
350382
runId: taskRun.id,
351383
body,
352384
existingMetadata: {
@@ -356,11 +388,14 @@ export class UpdateMetadataService {
356388
});
357389

358390
return {
359-
metadata: newMetadata,
391+
metadata: result?.metadata,
360392
// Internal id + membership keys, so callers can publish full realtime records the router routes by index.
361393
runId: taskRun.id,
362394
batchId: taskRun.batchId,
363395
runTags: taskRun.runTags,
396+
// The committed row's updatedAt — the realtime watermark. Undefined when nothing was
397+
// written here (no-op, or buffered for the flusher, which publishes itself).
398+
updatedAtMs: result?.updatedAtMs,
364399
};
365400
}
366401

@@ -372,15 +407,18 @@ export class UpdateMetadataService {
372407
runId: string;
373408
body: UpdateMetadataRequestBody;
374409
existingMetadata: IOPacket;
375-
}) {
410+
}): Promise<{ metadata: Record<string, unknown> | undefined; updatedAtMs?: number }> {
376411
if (Array.isArray(body.operations)) {
377412
return this.#updateRunMetadataWithOperations(runId, body.operations);
378413
} else {
379414
return this.#updateRunMetadataDirectly(runId, body, existingMetadata);
380415
}
381416
}
382417

383-
async #updateRunMetadataWithOperations(runId: string, operations: RunMetadataChangeOperation[]) {
418+
async #updateRunMetadataWithOperations(
419+
runId: string,
420+
operations: RunMetadataChangeOperation[]
421+
): Promise<{ metadata: Record<string, unknown> | undefined; updatedAtMs?: number }> {
384422
const MAX_RETRIES = 3;
385423
let attempts = 0;
386424

@@ -408,9 +446,9 @@ export class UpdateMetadataService {
408446
// Apply operations to the current metadata
409447
const applyResults = applyMetadataOperations(currentMetadata, operations);
410448

411-
// If no operations were applied, return the current metadata
449+
// If no operations were applied, return the current metadata (nothing written)
412450
if (applyResults.unappliedOperations.length === operations.length) {
413-
return currentMetadata;
451+
return { metadata: currentMetadata };
414452
}
415453

416454
const newMetadataPacket = handleMetadataPacket(
@@ -428,7 +466,9 @@ export class UpdateMetadataService {
428466
await this.options.onBeforeUpdate(runId);
429467
}
430468

431-
// Update with optimistic locking
469+
// Update with optimistic locking; updatedAt stamped explicitly so the caller can
470+
// publish the exact committed watermark without a follow-up read.
471+
const writeTime = new Date();
432472
const result = await this._prisma.taskRun.updateMany({
433473
where: {
434474
id: runId,
@@ -440,6 +480,7 @@ export class UpdateMetadataService {
440480
metadataVersion: {
441481
increment: 1,
442482
},
483+
updatedAt: writeTime,
443484
},
444485
});
445486

@@ -454,9 +495,10 @@ export class UpdateMetadataService {
454495
}
455496

456497
// If this was our last attempt, buffer the operations and return optimistically
498+
// (no watermark — the flusher writes later and publishes itself).
457499
if (attempts === MAX_RETRIES) {
458500
this.#ingestRunOperations(runId, operations);
459-
return applyResults.newMetadata;
501+
return { metadata: applyResults.newMetadata };
460502
}
461503

462504
// Otherwise sleep and try again
@@ -474,8 +516,10 @@ export class UpdateMetadataService {
474516
}
475517

476518
// Success! Return the new metadata
477-
return applyResults.newMetadata;
519+
return { metadata: applyResults.newMetadata, updatedAtMs: writeTime.getTime() };
478520
}
521+
522+
return { metadata: undefined };
479523
}
480524

481525
// Checks to see if a run is updatable
@@ -493,17 +537,19 @@ export class UpdateMetadataService {
493537
runId: string,
494538
body: UpdateMetadataRequestBody,
495539
existingMetadata: IOPacket
496-
) {
540+
): Promise<{ metadata: Record<string, unknown> | undefined; updatedAtMs?: number }> {
497541
const metadataPacket = handleMetadataPacket(
498542
body.metadata,
499543
"application/json",
500544
this.maximumSize
501545
);
502546

503547
if (!metadataPacket) {
504-
return {};
548+
return { metadata: {} };
505549
}
506550

551+
let updatedAtMs: number | undefined;
552+
507553
if (
508554
metadataPacket.data !== "{}" ||
509555
(existingMetadata.data && metadataPacket.data !== existingMetadata.data)
@@ -515,7 +561,9 @@ export class UpdateMetadataService {
515561
});
516562
}
517563

518-
// Update the metadata without version check
564+
// Update the metadata without version check; updatedAt stamped explicitly so the
565+
// caller can publish the exact committed watermark.
566+
const writeTime = new Date();
519567
await this._prisma.taskRun.update({
520568
where: {
521569
id: runId,
@@ -526,12 +574,14 @@ export class UpdateMetadataService {
526574
metadataVersion: {
527575
increment: 1,
528576
},
577+
updatedAt: writeTime,
529578
},
530579
});
580+
updatedAtMs = writeTime.getTime();
531581
}
532582

533583
const newMetadata = await parsePacket(metadataPacket);
534-
return newMetadata;
584+
return { metadata: newMetadata, updatedAtMs };
535585
}
536586

537587
#ingestRunOperations(runId: string, operations: RunMetadataChangeOperation[]) {

apps/webapp/app/services/metadata/updateMetadataInstance.server.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { singleton } from "~/utils/singleton";
22
import { env } from "~/env.server";
33
import { UpdateMetadataService } from "./updateMetadata.server";
44
import { prisma } from "~/db.server";
5+
import { publishChangeRecord } from "~/services/realtime/runChangeNotifierInstance.server";
56

67
export const updateMetadataService = singleton(
78
"update-metadata-service",
@@ -13,5 +14,16 @@ export const updateMetadataService = singleton(
1314
flushLoggingEnabled: env.BATCH_METADATA_OPERATIONS_FLUSH_LOGGING_ENABLED === "1",
1415
maximumSize: env.TASK_RUN_METADATA_MAXIMUM_SIZE,
1516
logLevel: env.BATCH_METADATA_OPERATIONS_FLUSH_LOGGING_ENABLED === "1" ? "debug" : "info",
17+
// Buffered (parent/root) operations land via the flusher, not the caller's request —
18+
// publish here so those changes wake live feeds too (no-op when the backend is off).
19+
onRunFlushed: (run) => {
20+
publishChangeRecord({
21+
runId: run.runId,
22+
envId: run.environmentId,
23+
tags: run.tags,
24+
batchId: run.batchId,
25+
updatedAtMs: run.updatedAtMs,
26+
});
27+
},
1628
})
1729
);

0 commit comments

Comments
 (0)